Is there any example that use hbase connector in stream API

2023-09-21 Thread 碳酸君
hi community:

 I'm trying to write some data to hbase in a stream job ,with
flink-connector-hbase-2.2 .
 I have no idea about instantiate
org.apache.flink.connector.hbase.util.HBaseTableSchema and
org.apache.flink.connector.hbase.sink.HBaseMutationConverter .


Regards,
Teii


Re: Window aggregation on two joined table

2023-09-21 Thread Eugenio Marotti
Thank you. 
I’ll try to configure a watermark on the second table.

Eugenio

> Il giorno 21 set 2023, alle ore 16:30, Schwalbe Matthias 
>  ha scritto:
> 
> … well yes and no:
> If the second table is a small table used for enrichment, you can also mark 
> it as broadcast table, but I don’t know how to do that on table API
> If the second table has significant data and significant update, the you need 
> to configure watermarking/event time semantics on the second table as well
> The logic is this:
> Your join operator only generates output windows once the event time passes 
> by the end of the time window
> The event time/watermark time of you join operator is the minimum watermark 
> time of all inputs
> Because your second table does not emit watermark, it’s watermark time  
> remains at Long.MinValue, hence also the operator time stays there
> Another way to make progress is, in case your second table does not update 
> watermarks/data often enough, to mark the source with an idle watermark 
> generator in which case it is rendered as ‘timeless’ and does not prevent 
> time progress in your join operator
> Again, not sure how to configure this
>  
> Ancora cari saluti
>  
> Thias
>  
>  
>  
>  
>  
> From: Eugenio Marotti  
> Sent: Thursday, September 21, 2023 2:35 PM
> To: Schwalbe Matthias 
> Cc: user@flink.apache.org
> Subject: Re: Window aggregation on two joined table
>  
> Hi Matthias,
>  
> No the second table doesn’t have an event time and a watermark specified. In 
> order for the window to work do I need a watermark also on the second table?
>  
> Thanks
> Eugenio
> 
> 
> Il giorno 21 set 2023, alle ore 13:45, Schwalbe Matthias 
> mailto:matthias.schwa...@viseca.ch>> ha scritto:
>  
> Ciao Eugenio,
>  
> I might be mistaken, but did you specify the event time for the second table 
> like you did for the first table (watermark(….))?
> I am no so acquainted with table api (doing more straight data stream api 
> work), but I assume this join and windowing should be by event time.
>  
> What do you think?
>  
> Cari saluti
>  
> Thias
>  
>  
> From: Eugenio Marotti  > 
> Sent: Thursday, September 21, 2023 8:56 AM
> To: user@flink.apache.org 
> Subject: Window aggregation on two joined table 
>  
> Hi,
>  
> I’m trying to execute a window aggregation on two joined table from two Kafka 
> topics (upsert fashion), but I get no output. Here’s the code I’m using:
>  
> This is the first table from Kafka with an event time watermark on 
> ‘data_fine’ attribute:
>  
> final TableDescriptor phasesDurationsTableDescriptor = 
> TableDescriptor.forConnector("upsert-kafka")
>.schema(Schema.newBuilder()
>  .column("id_fascicolo", DataTypes.BIGINT().notNull())
>  .column("nrg", DataTypes.STRING())
>  .column("giudice", DataTypes.STRING())
>  .column("oggetto", DataTypes.STRING())
>  .column("codice_oggetto", DataTypes.STRING())
>  .column("ufficio", DataTypes.STRING())
>  .column("sezione", DataTypes.STRING())
>  .column("fase_completata", DataTypes.BOOLEAN())
>  .column("fase", DataTypes.STRING().notNull())
>  .column("durata", DataTypes.BIGINT())
>  .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
>  .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
>  .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
>  .primaryKey("id_fascicolo", "fase")
>  .build())
>.option(KafkaConnectorOptions.TOPIC, 
> List.of("sicid.processor.phases-durations"))
>.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>.option(KafkaConnectorOptions.KEY_FORMAT, "json")
>.option(KafkaConnectorOptions.VALUE_FORMAT, "json")
>.build();
> tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
> Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);
> Here’s the second table:
> final TableDescriptor averageJudgeByPhaseReportTableDescriptor = 
> TableDescriptor.forConnector("upsert-kafka")
>.schema(Schema.newBuilder()
>  .column("giudice", DataTypes.STRING().notNull())
>  .column("fase", DataTypes.STRING().notNull())
>  .column("media_mobile", DataTypes.BIGINT())
>  .primaryKey("giudice", "fase")
>  .build())
>.option(KafkaConnectorOptions.TOPIC, 
> List.of("sicid.processor.average-judge-by-phase-report"))
>.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>.option(KafkaConnectorOptions.KEY_FORMAT, "json")
>.option(KafkaConnectorOptions.VALUE_FORMAT, "json")
>.option(KafkaConnectorOptions.PROPS_GROUP_ID, 
> "average-judge-by-phase-report")
>.build();
> tEnv.createTable("AverageJudgeByPhaseReport_Kafka", 
> averageJudgeByPhaseReportTableDescriptor);
> Table average

Re: Zookeeper HA with Kubernetes: Possible to use the same Zookeeper cluster w/multiple Flink Operators?

2023-09-21 Thread Brian King
Gyula,

Thanks, you've helped us move closer to migrating our application to the Flink 
Operator![0]

Best,

Brian King
SRE, Data Platform/Search Platform
Wikimedia Foundation
IRC: inflatador

[0] https://phabricator.wikimedia.org/T326409




> On Sep 21, 2023, at 12:16 AM, Gyula Fóra  wrote:
> 
> Hi!
> 
> The cluster-id for each FlinkDeployment is simply the name of the deployment. 
> So they are all different in a given namespace. (In other words they are not 
> fixed as your question suggests but set automatically)
> 
> So there should be no problem sharing the ZK cluster .
> 
> Cheers
> Gyula
> 
> On Thu, 21 Sep 2023 at 03:46, Brian King  > wrote:
> Hello Flink Users!
> 
> We're attempting to deploy a Flink application cluster on Kubernetes, using 
> the Flink Operator and Zookeeper for HA.
> 
> We're using Flink 1.16 and I have a question about some of the Zookeeper 
> configuration[0]:
> 
> "high-availability.zookeeper.path.root" is described as "The root ZooKeeper 
> node, under which all cluster nodes are placed"
> 
> "high-availability.cluster-id" , which says "important: customize per 
> cluster." But it also says "you should not set this value manually when 
> running on [...] native Kubernetes [...]in those cases a cluster-id is [...] 
> automatically generated."
> 
> Our design calls for multiple Flink application clusters managed by the same 
> Flink Operator, and using the same Zookeeper quorum for each Flink 
> Application cluster. Will the Flink Operator be able to handle this, or will 
> the different clusters collide due to the fixed 
> "high-availability.cluster-id" value? Is it possible to avoid this by setting 
> a unique "high-availability.zookeeper.path.root" for each application cluster?
> 
> Thanks for your time. I'm new to Flink, so I apologize if I did not explain 
> myself clearly. Please let me know if you need additional information.
> 
> Best,
> 
> Brian King
> SRE, Data Platform/Search Platform
> Wikimedia Foundation
> IRC: inflatador
> 
> [0] 
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/ha/zookeeper_ha/#configuration
>  
> 
> 
> 



Completable Future in RichSinkFunction with Retry

2023-09-21 Thread patricia lee
I initially used the genericbase sink / the richAsync function, but these
two were not applicable to my use case.

I implemented a completable future that sends data sendBatch() to vendor
api.


Is there a built in api supported for retry with custom method in rich sink
function?



Regards,
Pat


RE: Window aggregation on two joined table

2023-09-21 Thread Schwalbe Matthias
… well yes and no:

  *   If the second table is a small table used for enrichment, you can also 
mark it as broadcast table, but I don’t know how to do that on table API
  *   If the second table has significant data and significant update, the you 
need to configure watermarking/event time semantics on the second table as well
  *   The logic is this:
 *   Your join operator only generates output windows once the event time 
passes by the end of the time window
 *   The event time/watermark time of you join operator is the minimum 
watermark time of all inputs
 *   Because your second table does not emit watermark, it’s watermark time 
 remains at Long.MinValue, hence also the operator time stays there
  *   Another way to make progress is, in case your second table does not 
update watermarks/data often enough, to mark the source with an idle watermark 
generator in which case it is rendered as ‘timeless’ and does not prevent time 
progress in your join operator
 *   Again, not sure how to configure this

Ancora cari saluti

Thias





From: Eugenio Marotti 
Sent: Thursday, September 21, 2023 2:35 PM
To: Schwalbe Matthias 
Cc: user@flink.apache.org
Subject: Re: Window aggregation on two joined table

Hi Matthias,

No the second table doesn’t have an event time and a watermark specified. In 
order for the window to work do I need a watermark also on the second table?

Thanks
Eugenio


Il giorno 21 set 2023, alle ore 13:45, Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> ha scritto:

Ciao Eugenio,

I might be mistaken, but did you specify the event time for the second table 
like you did for the first table (watermark(….))?
I am no so acquainted with table api (doing more straight data stream api 
work), but I assume this join and windowing should be by event time.

What do you think?

Cari saluti

Thias


From: Eugenio Marotti 
mailto:ing.eugenio.maro...@gmail.com>>
Sent: Thursday, September 21, 2023 8:56 AM
To: user@flink.apache.org
Subject: Window aggregation on two joined table

Hi,

I’m trying to execute a window aggregation on two joined table from two Kafka 
topics (upsert fashion), but I get no output. Here’s the code I’m using:

This is the first table from Kafka with an event time watermark on ‘data_fine’ 
attribute:


final TableDescriptor phasesDurationsTableDescriptor = 
TableDescriptor.forConnector("upsert-kafka")
   .schema(Schema.newBuilder()
 .column("id_fascicolo", DataTypes.BIGINT().notNull())
 .column("nrg", DataTypes.STRING())
 .column("giudice", DataTypes.STRING())
 .column("oggetto", DataTypes.STRING())
 .column("codice_oggetto", DataTypes.STRING())
 .column("ufficio", DataTypes.STRING())
 .column("sezione", DataTypes.STRING())
 .column("fase_completata", DataTypes.BOOLEAN())
 .column("fase", DataTypes.STRING().notNull())
 .column("durata", DataTypes.BIGINT())
 .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
 .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
 .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
 .primaryKey("id_fascicolo", "fase")
 .build())
   .option(KafkaConnectorOptions.TOPIC, 
List.of("sicid.processor.phases-durations"))
   .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
   .option(KafkaConnectorOptions.KEY_FORMAT, "json")
   .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
   .build();
tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);

Here’s the second table:

final TableDescriptor averageJudgeByPhaseReportTableDescriptor = 
TableDescriptor.forConnector("upsert-kafka")
   .schema(Schema.newBuilder()
 .column("giudice", DataTypes.STRING().notNull())
 .column("fase", DataTypes.STRING().notNull())
 .column("media_mobile", DataTypes.BIGINT())
 .primaryKey("giudice", "fase")
 .build())
   .option(KafkaConnectorOptions.TOPIC, 
List.of("sicid.processor.average-judge-by-phase-report"))
   .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
   .option(KafkaConnectorOptions.KEY_FORMAT, "json")
   .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
   .option(KafkaConnectorOptions.PROPS_GROUP_ID, 
"average-judge-by-phase-report")
   .build();
tEnv.createTable("AverageJudgeByPhaseReport_Kafka", 
averageJudgeByPhaseReportTableDescriptor);
Table averageJudgeByPhaseReportTable = 
tEnv.from("AverageJudgeByPhaseReport_Kafka");

Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable
   .select(
 $("giudice").as("giudice_media"),
 $("fase").as("fase_media"),
 $("media_mobile")
   );



And here’s the code I’m experimenting with:

phasesD

Re: Window aggregation on two joined table

2023-09-21 Thread Eugenio Marotti
Hi Matthias,

No the second table doesn’t have an event time and a watermark specified. In 
order for the window to work do I need a watermark also on the second table?

Thanks
Eugenio

> Il giorno 21 set 2023, alle ore 13:45, Schwalbe Matthias 
>  ha scritto:
> 
> Ciao Eugenio,
>  
> I might be mistaken, but did you specify the event time for the second table 
> like you did for the first table (watermark(….))?
> I am no so acquainted with table api (doing more straight data stream api 
> work), but I assume this join and windowing should be by event time.
>  
> What do you think?
>  
> Cari saluti
>  
> Thias
>  
>  
> From: Eugenio Marotti  
> Sent: Thursday, September 21, 2023 8:56 AM
> To: user@flink.apache.org
> Subject: Window aggregation on two joined table 
>  
> Hi,
>  
> I’m trying to execute a window aggregation on two joined table from two Kafka 
> topics (upsert fashion), but I get no output. Here’s the code I’m using:
>  
> This is the first table from Kafka with an event time watermark on 
> ‘data_fine’ attribute:
>  
> final TableDescriptor phasesDurationsTableDescriptor = 
> TableDescriptor.forConnector("upsert-kafka")
>.schema(Schema.newBuilder()
>  .column("id_fascicolo", DataTypes.BIGINT().notNull())
>  .column("nrg", DataTypes.STRING())
>  .column("giudice", DataTypes.STRING())
>  .column("oggetto", DataTypes.STRING())
>  .column("codice_oggetto", DataTypes.STRING())
>  .column("ufficio", DataTypes.STRING())
>  .column("sezione", DataTypes.STRING())
>  .column("fase_completata", DataTypes.BOOLEAN())
>  .column("fase", DataTypes.STRING().notNull())
>  .column("durata", DataTypes.BIGINT())
>  .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
>  .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
>  .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
>  .primaryKey("id_fascicolo", "fase")
>  .build())
>.option(KafkaConnectorOptions.TOPIC, 
> List.of("sicid.processor.phases-durations"))
>.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>.option(KafkaConnectorOptions.KEY_FORMAT, "json")
>.option(KafkaConnectorOptions.VALUE_FORMAT, "json")
>.build();
> tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
> Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);
> Here’s the second table:
> final TableDescriptor averageJudgeByPhaseReportTableDescriptor = 
> TableDescriptor.forConnector("upsert-kafka")
>.schema(Schema.newBuilder()
>  .column("giudice", DataTypes.STRING().notNull())
>  .column("fase", DataTypes.STRING().notNull())
>  .column("media_mobile", DataTypes.BIGINT())
>  .primaryKey("giudice", "fase")
>  .build())
>.option(KafkaConnectorOptions.TOPIC, 
> List.of("sicid.processor.average-judge-by-phase-report"))
>.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>.option(KafkaConnectorOptions.KEY_FORMAT, "json")
>.option(KafkaConnectorOptions.VALUE_FORMAT, "json")
>.option(KafkaConnectorOptions.PROPS_GROUP_ID, 
> "average-judge-by-phase-report")
>.build();
> tEnv.createTable("AverageJudgeByPhaseReport_Kafka", 
> averageJudgeByPhaseReportTableDescriptor);
> Table averageJudgeByPhaseReportTable = 
> tEnv.from("AverageJudgeByPhaseReport_Kafka");
> 
> Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable
>.select(
>  $("giudice").as("giudice_media"),
>  $("fase").as("fase_media"),
>  $("media_mobile")
>);
>  
> And here’s the code I’m experimenting with:
> phasesDurationsTable
>.join(renamedAverageJudgeByPhaseReportTable)
>.where($("giudice").isEqual($("giudice_media")))
>.window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w"))
>.groupBy(
>  $("giudice"),
>  $("w")
>)
>.select(
>  $("giudice")
>)
>.execute().print();
>  
> Am I doing something wrong?
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und 
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit 
> von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
> Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
> Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung 
> per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. 
> Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist 
> streng verboten.
> 
> This message is intended only for the named recipient and may contain 
> confidential or privileged information. As the confidentiality of email 
> communication cannot be guaranteed, we do not accept 

RE: Window aggregation on two joined table

2023-09-21 Thread Schwalbe Matthias
Ciao Eugenio,

I might be mistaken, but did you specify the event time for the second table 
like you did for the first table (watermark(….))?
I am no so acquainted with table api (doing more straight data stream api 
work), but I assume this join and windowing should be by event time.

What do you think?

Cari saluti

Thias


From: Eugenio Marotti 
Sent: Thursday, September 21, 2023 8:56 AM
To: user@flink.apache.org
Subject: Window aggregation on two joined table

Hi,

I’m trying to execute a window aggregation on two joined table from two Kafka 
topics (upsert fashion), but I get no output. Here’s the code I’m using:

This is the first table from Kafka with an event time watermark on ‘data_fine’ 
attribute:


final TableDescriptor phasesDurationsTableDescriptor = 
TableDescriptor.forConnector("upsert-kafka")
   .schema(Schema.newBuilder()
 .column("id_fascicolo", DataTypes.BIGINT().notNull())
 .column("nrg", DataTypes.STRING())
 .column("giudice", DataTypes.STRING())
 .column("oggetto", DataTypes.STRING())
 .column("codice_oggetto", DataTypes.STRING())
 .column("ufficio", DataTypes.STRING())
 .column("sezione", DataTypes.STRING())
 .column("fase_completata", DataTypes.BOOLEAN())
 .column("fase", DataTypes.STRING().notNull())
 .column("durata", DataTypes.BIGINT())
 .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
 .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
 .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
 .primaryKey("id_fascicolo", "fase")
 .build())
   .option(KafkaConnectorOptions.TOPIC, 
List.of("sicid.processor.phases-durations"))
   .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
   .option(KafkaConnectorOptions.KEY_FORMAT, "json")
   .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
   .build();
tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);

Here’s the second table:

final TableDescriptor averageJudgeByPhaseReportTableDescriptor = 
TableDescriptor.forConnector("upsert-kafka")
   .schema(Schema.newBuilder()
 .column("giudice", DataTypes.STRING().notNull())
 .column("fase", DataTypes.STRING().notNull())
 .column("media_mobile", DataTypes.BIGINT())
 .primaryKey("giudice", "fase")
 .build())
   .option(KafkaConnectorOptions.TOPIC, 
List.of("sicid.processor.average-judge-by-phase-report"))
   .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
   .option(KafkaConnectorOptions.KEY_FORMAT, "json")
   .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
   .option(KafkaConnectorOptions.PROPS_GROUP_ID, 
"average-judge-by-phase-report")
   .build();
tEnv.createTable("AverageJudgeByPhaseReport_Kafka", 
averageJudgeByPhaseReportTableDescriptor);
Table averageJudgeByPhaseReportTable = 
tEnv.from("AverageJudgeByPhaseReport_Kafka");

Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable
   .select(
 $("giudice").as("giudice_media"),
 $("fase").as("fase_media"),
 $("media_mobile")
   );



And here’s the code I’m experimenting with:

phasesDurationsTable
   .join(renamedAverageJudgeByPhaseReportTable)
   .where($("giudice").isEqual($("giudice_media")))
   .window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w"))
   .groupBy(
 $("giudice"),
 $("w")
   )
   .select(
 $("giudice")
   )
   .execute().print();



Am I doing something wrong?
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Postgres Jdbc Exactly Once Sink Deadlock with UPSERT query

2023-09-21 Thread Pascal Rosenkranz
Hi  everyone,

In a streaming Job, we are using the JdbcSink.exactlyOnceSink() as described
on
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/d
atastream/jdbc/#jdbcsinkexactlyoncesink
 .

The query we're using is an UPSERT query. I cannot post the exact query, but
in essence it looks something like this:

INSERT INTO edges(edge_id, count) VALUES(?, ?) ON CONFLICT (edge_id) DO
UPDATE SET count = edges.count + EXCLUDED.count;

Of course, the table has a Primary Key on `edge_id`.

Batching is enabled with 250 UPSERTs per batch. Additionally, we set
withMaxRetries() to 0 on the JdbcExecutionOptions. 
On the JdbcExactlyOnceOptions, we set withTransactionPerConnection() to
true, as per documentation.

We're using unaligned checkpointing. Once a checkpoint is triggered, the
Sink will prepare a transaction in the database, I can see that in the
pg_locks.
However, that prepared transaction is never committed. Instead, the Sink
function will already be in the invoke() for the next stream elements. Now,
since these new elements happen to conflict on keys from the previous (still
uncommitted) transaction, it seems like the new transaction (new connection
due to Postgres XA) is not able to acquire a 'ShareLock' lock - I see the
process is permanently stuck in (wait_event_type=Lock,
wait_event="transactionid") in pg_stat_activity.

Why is that first transaction not committed before new statements are
executed? 

Am I taking a wrong assumption that the Sink would work with UPSERTs or is
this a potential bug in the XA sink function?


Thanks in advance.

Best,
Pascal.


PS: We're using Flink 1.16.2. The JDBC connector dependency version is
3.1.1-1.16.