How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?

2022-06-24 Thread John Tipper
Hi,

I have a source table using a Kinesis connector reading events from AWS 
EventBridge using PyFlink 1.15.0. An example of the sorts of data that are in 
this stream is here: 
https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref.
 Note that the stream of data contains many different types of events, where 
the 'detail' field is completely different between different event types. There 
is no support for this connector using PyFlink DataStream API, so I use the 
Table API to construct the source table.  The table looks like this:


CREATE TABLE events (
 `id` VARCHAR,
 `source` VARCHAR,
 `account` VARCHAR,
 `region` VARCHAR,
 `detail-type` VARCHAR,
 `detail` VARCHAR,
 `source` VARCHAR,
 `resources` VARCHAR,
 `time` TIMESTAMP(0) WITH LOCAL TIME ZONE,
 WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
 PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
...
)


The table was created using:

 table_env.execute_sql(CREATE_STRING_ABOVE)

I'd like to turn this table into a data stream so I can perform some processing 
that is easier to do in the DataStream API:


events_stream_table = table_env.from_path('events')

events_stream = table_env.to_data_stream(events_stream_table)

# now do some processing - let's filter by the type of event we get

codebuild_stream = events_stream.filter(
lambda event: event['source'] == 'aws.codebuild'
)

# now do other stuff on a stream containing only events that are identical in 
shape
...
# maybe convert back into a Table and perform SQL on the data


When I run this, I get an exception:



org.apache.flink.table.api.TableException: Unsupported conversion from data type

 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to

type information. Only data types that originated from type information fully

support a reverse conversion.

Somebody reported a similar error here 
(https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception)
 When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL TIME 
ZONE" with a "TIMESTAMP(3)" I get a different exception:

TypeError: The java type info: LocalDateTime is not supported in PyFlink 
currently.

Is there a way of converting this Table into a DataStream (and then back 
again)? I need to use the data in the "time"​ field as the source of watermarks 
for my events.

Many thanks,

John


Re: Flink k8s Operator on AWS?

2022-06-24 Thread Matt Casters
Yes of-course.  I already feel a bit less intelligent for having asked the
question ;-)

The status now is that I managed to have it all puzzled together.  Copying
the files from s3 to an ephemeral volume takes all of 2 seconds so it's
really not an issue.  The cluster starts and our fat jar and Apache Hop
MainBeam class is found and started.

The only thing that remains is figuring out how to configure the Flink
cluster itself.  I have a couple of m5.large ec2 instances in a node group
on EKS and I set taskmanager.numberOfTaskSlots to "4".  However, the tasks
in the pipeline can't seem to find resources to start.

Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Slot request bulk is not fulfillable! Could not allocate the required slot
within slot request timeout

Parallelism was set to 1 for the runner and there are only 2 tasks in my
first Beam pipeline so it should be simple enough but it just times out.

Next step for me is to document the result which will end up on
hop.apache.org.   I'll probably also want to demo this in Austin at the
upcoming Beam summit.

Thanks a lot for your time and help so far!

Cheers,
Matt


Re: Flink k8s Operator on AWS?

2022-06-24 Thread Őrhidi Mátyás
Hi Matt,

Yes. There are several official Flink images with various JVMs including
Java 11.

https://hub.docker.com/_/flink

Cheers,
Matyas

On Fri, Jun 24, 2022 at 2:06 PM Matt Casters 
wrote:

> Hi Mátyás & all,
>
> Thanks again for the advice so far. On a related note I noticed Java 8
> being used, indicated in the log.
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>  JAVA_HOME: /usr/local/openjdk-8
>
> Is there a way to use Java 11 to start Flink with?
>
> Kind regards,
>
> Matt
>
> On Tue, Jun 21, 2022 at 4:53 PM Őrhidi Mátyás 
> wrote:
>
>> Hi Matt,
>>
>> I believe an artifact fetcher (e.g
>> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
>> template (
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
>> is an elegant way to solve your problem.
>>
>> The operator uses K8s native integration under the hood:
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>>  In
>> application mode,  the main() method of the application is executed on the
>> JobManager, hence we need the jar locally.
>>
>> You can launch a session cluster (without job spec) on the operator that
>> allows submitting jars if you would like to avoid dealing with
>> authentication, but the recommended and safe approach is to use
>> sessionjobs for this purpose.
>>
>>
>> Cheers,
>> Matyas
>>
>> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
>> matt.cast...@neotechnology.com> wrote:
>>
>>> Thank you very much for the help Matyas and Gyula!
>>>
>>> I just saw a video today where you were presenting the FKO.  Really nice
>>> stuff!
>>>
>>> So I'm guessing we're executing "flink run" at some point on the master
>>> and that this is when we need the jar file to be local?
>>> Am I right in assuming that this happens after the flink cluster in
>>> question was started, as part of the job execution?
>>>
>>> On the one hand I agree with the underlying idea that authentication and
>>> security should not be a responsibility of the operator.   On the other
>>> hand I could add a flink-s3 driver but then I'd also have to configure it
>>> and so on and it's just hard to get that configuration to be really clean.
>>>
>>> Do we have some service running on the flink cluster which would allow
>>> us to post/copy files from the client (running kubectl) to the master?  If
>>> so, could we add an option to the job specification to that effect?  Just
>>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>>
>>> All the best,
>>> Matt
>>>
>>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>>> wrote:
>>>
 Hi Matt,

 - In FlinkDeployments you can utilize an init container to download
 your artifact onto a shared volume, then you can refer to it as local:/..
 from the main container. FlinkDeployments comes with pod template support
 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template

 - FlinkSessionJobs comes with an artifact fetcher, but it may need some
 tweaking to make it work on your environment:

 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview

 I hope it helps, let us know if you have further questions.

 Cheers,
 Matyas



 On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
 matt.cast...@neotechnology.com> wrote:

> Hi Flink team!
>
> I'm interested in getting the new Flink Kubernetes Operator to work on
> AWS EKS.  Following the documentation I got pretty far.  However, when
> trying to run a job I got the following error:
>
> Only "local" is supported as schema for application mode. This assumes
>> t
>> hat the jar is located in the image, not the Flink client. An example
>> of such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>
>
>  I have an Apache Hop/Beam fat jar capable of running the Flink
> pipeline in my yml file:
>
> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>
> So how could I go about getting the fat jar in a desired location for
> the operator?
>
> Getting this to work would be really cool for both short and
> long-lived pipelines in the service of all sorts of data integration work.
> It would do away with the complexity of setting up and maintaining your 
> own
> Flink cluster.
>
> Thanks in advance!
>
> All the best,
>
> Matt (mcasters, Apache Hop PMC)
>
>


Re: 来自潘明文的邮件

2022-06-24 Thread Lincoln Lee
Hi,
   邮件中直接贴图片无法正常看到,可以发下文本

Best,
Lincoln Lee


潘明文  于2022年6月24日周五 16:36写道:

> 你好,下面2个SINK 能够并发同时处理吗?还是要窜行,等第一个SINK 好了,才能第二个SINK.
>
>


Re: lookup join对应task无法正常恢复?

2022-06-24 Thread Lincoln Lee
Hi,
   请问使用的 mysql 维表是 flink sql 内置的 jdbc connector 吗? 如果是的话,对应内部的 cache 只是读缓存,
并不会持久化, 任务重启或者到达设定的缓存淘汰条件就失效了
   如果是自己开发的维表,建议增加相应的数据加载日志, 以便确认 failover 时的处理是否有异常

Best,
Lincoln Lee


Xuchao  于2022年6月24日周五 17:15写道:

> 您好!
> 我在使用flink时遇到一些问题。
> flink-1.14.4
> sqlserver-cdc-2.2.1
> yarn-per-job
>
> 我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点;
> sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据;
> 检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据;
>
> 以上,可能是什么问题,应该如何解决呢?
>
> 期待回复!
> best wishes!
>
> 附日志:
> 2022-06-24 14:55:45,950 ERROR
> com.ververica.cdc.debezium.internal.Handover [] - Reporting
> error:
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
> An exception occurred in the change event producer. This connector will be
> stopped.
> at
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_301]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_301]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_301]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
> Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数
> cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
> at
> com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> ... 7 more
> 2022-06-24 14:55:45,953 INFO  io.debezium.embedded.EmbeddedEngine
> [] - Stopping the embedded engine
> 2022-06-24 14:55:45,954 INFO
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
> [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> carflow]], fields=[id, plate_license, site_id, create_time, flow_type,
> circle_id]) -> Calc(select=[id, plate_license, site_id, create_time,
> (create_time + -2880:INTERVAL HOUR) AS c_time, flow_type, circle_id])
> -> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0
> discarding 0 drained requests
> 2022-06-24 14:55:45,955 INFO  io.debezium.embedded.EmbeddedEngine
> [] - Stopping the embedded engine
> 2022-06-24 14:55:45,957 WARN  org.apache.flink.runtime.taskmanager.Task
> [] - Source: TableSourceScan(table=[[default_catalog,
> default_database, carflow]], fields=[id, plate_license, site_id,
> create_time, flow_type, circle_id]) -> Calc(select=[id, plate_license,
> site_id, create_time, (create_time + -2880:INTERVAL HOUR) AS c_time,
> flow_type, circle_id]) -> WatermarkAssigner(rowtime=[c_time],
> watermark=[c_time]) (1/1)#0 (71206ba8149ac20bb39d8169ff3d2f02) switched
> from RUNNING to FAILED with failure cause:
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
> An exception occurred in the change event producer. This connector will be
> stopped.
> at
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
> at
> 

Re: How to use connectors in PyFlink 1.15.0 when not defined in Python API?

2022-06-24 Thread Andrew Otto
I've had success using the Java in pyflink via pyflink.java_gateway.
Something like:

from pyflink.java_gateway import get_gateway
jvm = get_gateway()

# then perhaps something like:
FlinkKinesisConsumer = jvm.
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer

There also seems to be a nice java_utils.py

 with helpers that may uh, help.

Not sure if this will work, you might need to use the python env's a java
StreamTableEnvironment to do it?  Here's an example

of how the python StreamTableEnvironment calls out to the Java one.

BTW: I'm not an authority nor I have I really tried this, so take this
advice with a grain of salt!  :)

Good luck!






On Fri, Jun 24, 2022 at 9:06 AM John Tipper  wrote:

> Hi all,
>
> There are a number of connectors which do not appear to be in the Python
> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these
> connectors by using the Table API:
>
> CREATE TABLE my_table (...)
> WITH ('connector' = 'kinesis' ...)
>
>
> I guess if you wanted the stream as a DataStream you'd I guess you'd
> create the Table and then convert into a DataStream?
>
> Is there a way of directly instantiating these connectors in PyFlink
> without needed to use SQL like this (and without having to wait until
> v1.16)? e.g. the Java API looks like this:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
>
>
> Many thanks,
>
> John
>


How to use connectors in PyFlink 1.15.0 when not defined in Python API?

2022-06-24 Thread John Tipper
Hi all,

There are a number of connectors which do not appear to be in the Python API 
v1.15.0, e.g. Kinesis. I can see that it's possible to use these connectors by 
using the Table API:

CREATE TABLE my_table (...)
WITH ('connector' = 'kinesis' ...)

I guess if you wanted the stream as a DataStream you'd I guess you'd create the 
Table and then convert into a DataStream?

Is there a way of directly instantiating these connectors in PyFlink without 
needed to use SQL like this (and without having to wait until v1.16)? e.g. the 
Java API looks like this:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
"kinesis_stream_name", new SimpleStringSchema(), consumerConfig));

Many thanks,

John


Re: Flink k8s Operator on AWS?

2022-06-24 Thread Matt Casters
Hi Mátyás & all,

Thanks again for the advice so far. On a related note I noticed Java 8
being used, indicated in the log.

org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
 JAVA_HOME: /usr/local/openjdk-8

Is there a way to use Java 11 to start Flink with?

Kind regards,

Matt

On Tue, Jun 21, 2022 at 4:53 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> I believe an artifact fetcher (e.g
> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
> template (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
> is an elegant way to solve your problem.
>
> The operator uses K8s native integration under the hood:
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>  In
> application mode,  the main() method of the application is executed on the
> JobManager, hence we need the jar locally.
>
> You can launch a session cluster (without job spec) on the operator that
> allows submitting jars if you would like to avoid dealing with
> authentication, but the recommended and safe approach is to use
> sessionjobs for this purpose.
>
>
> Cheers,
> Matyas
>
> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Thank you very much for the help Matyas and Gyula!
>>
>> I just saw a video today where you were presenting the FKO.  Really nice
>> stuff!
>>
>> So I'm guessing we're executing "flink run" at some point on the master
>> and that this is when we need the jar file to be local?
>> Am I right in assuming that this happens after the flink cluster in
>> question was started, as part of the job execution?
>>
>> On the one hand I agree with the underlying idea that authentication and
>> security should not be a responsibility of the operator.   On the other
>> hand I could add a flink-s3 driver but then I'd also have to configure it
>> and so on and it's just hard to get that configuration to be really clean.
>>
>> Do we have some service running on the flink cluster which would allow us
>> to post/copy files from the client (running kubectl) to the master?  If so,
>> could we add an option to the job specification to that effect?  Just
>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>
>> All the best,
>> Matt
>>
>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>> wrote:
>>
>>> Hi Matt,
>>>
>>> - In FlinkDeployments you can utilize an init container to download your
>>> artifact onto a shared volume, then you can refer to it as local:/.. from
>>> the main container. FlinkDeployments comes with pod template support
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>>
>>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>>> tweaking to make it work on your environment:
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>>
>>> I hope it helps, let us know if you have further questions.
>>>
>>> Cheers,
>>> Matyas
>>>
>>>
>>>
>>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>>> matt.cast...@neotechnology.com> wrote:
>>>
 Hi Flink team!

 I'm interested in getting the new Flink Kubernetes Operator to work on
 AWS EKS.  Following the documentation I got pretty far.  However, when
 trying to run a job I got the following error:

 Only "local" is supported as schema for application mode. This assumes t
> hat the jar is located in the image, not the Flink client. An example
> of such path is: local:///opt/flink/examples/streaming/WindowJoin.jar


  I have an Apache Hop/Beam fat jar capable of running the Flink
 pipeline in my yml file:

 jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar

 So how could I go about getting the fat jar in a desired location for
 the operator?

 Getting this to work would be really cool for both short and long-lived
 pipelines in the service of all sorts of data integration work.  It would
 do away with the complexity of setting up and maintaining your own Flink
 cluster.

 Thanks in advance!

 All the best,

 Matt (mcasters, Apache Hop PMC)




lookup join对应task无法正常恢复?

2022-06-24 Thread Xuchao
您好!
我在使用flink时遇到一些问题。
flink-1.14.4
sqlserver-cdc-2.2.1
yarn-per-job

我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点;
sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据;
检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据;

以上,可能是什么问题,应该如何解决呢?

期待回复!
best wishes!

附日志:
2022-06-24 14:55:45,950 ERROR com.ververica.cdc.debezium.internal.Handover  
   [] - Reporting error:
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) 
~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
 ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
 ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_301]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_301]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_301]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数 
cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
 ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608) 
~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
... 7 more
2022-06-24 14:55:45,953 INFO  io.debezium.embedded.EmbeddedEngine   
   [] - Stopping the embedded engine
2022-06-24 14:55:45,954 INFO  
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
 [] - Source: TableSourceScan(table=[[default_catalog, default_database, 
carflow]], fields=[id, plate_license, site_id, create_time, flow_type, 
circle_id]) -> Calc(select=[id, plate_license, site_id, create_time, 
(create_time + -2880:INTERVAL HOUR) AS c_time, flow_type, circle_id]) -> 
WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 discarding 0 
drained requests
2022-06-24 14:55:45,955 INFO  io.debezium.embedded.EmbeddedEngine   
   [] - Stopping the embedded engine
2022-06-24 14:55:45,957 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Source: TableSourceScan(table=[[default_catalog, 
default_database, carflow]], fields=[id, plate_license, site_id, create_time, 
flow_type, circle_id]) -> Calc(select=[id, plate_license, site_id, create_time, 
(create_time + -2880:INTERVAL HOUR) AS c_time, flow_type, circle_id]) -> 
WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 
(71206ba8149ac20bb39d8169ff3d2f02) switched from RUNNING to FAILED with failure 
cause: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
at 

来自潘明文的邮件

2022-06-24 Thread 潘明文
你好,下面2个SINK 能够并发同时处理吗?还是要窜行,等第一个SINK 好了,才能第二个SINK.