关于很多开源库的接口和子接口方法重复问题的疑问

2021-06-23 Thread yidan zhao
如题,flink中以及其他开源库总有类似现象,一直忽略,但其实感觉这种做法很影响理解。
比如netty的ChannelPipeline接口继承了ChannelInboundInvoker接口,但将其方法全部在自身接口中都声明了一次,这不是多此一举吗。


Re: PyFlink kafka producer topic override

2021-06-23 Thread Curt Buechter
Hi Dian,
Thanks for the reply.
I don't think a filter function makes sense here. I have 2,000 tenants in
the source database, and I want all records for a single tenant in a
tenant-specific topic. So, with a filter function, if I understand it
correctly, I would need 2,000 different filters, which isn't very practical.

An example:
source_topic(tenant_id, first_name, last_name)

destination:
tenant1.sink_topic (first_name, last_name)
tenant2.sink_topic (first_name, last_name)
...
tenant2000.sink_topic (first_name, last_name)

On 2021/06/24 03:18:36, Dian Fu  wrote:
> You are right that split is still not supported. Does it make sense for
you to split the stream using a filter function? There is some overhead
compared the built-in stream.split as you need to provide a filter function
for each sub-stream and so a record will evaluated multiple times.>
>
> > 2021年6月24日 上午3:08,Curt Buechter  写道:>
> > >
> > Hi,>
> > New PyFlink user here. Loving it so far. The first major problem I've
run into is that I cannot create a Kafka Producer with dynamic topics. I
see that this has been available for quite some time in Java with Keyed
Serialization using the getTargetTopic method. Another way to do this in
Java may be with stream.split(), and adding a different sink for the split
streams. Stream splitting is also not available in PyFlink.>
> > Am I missing anything? Has anyone implemented this before in PyFlink,
or know of a way to make it happen?>
> > The use case here is that I'm using a CDC Debezium connector to
populate kafka topics from a multi-tenant database, and I'm trying to use
PyFlink to split the records into a different topic for each tenant.>
> > >
> > Thanks>
>
>


flink1.12某个jobmanager一直处于leader选举中

2021-06-23 Thread yidan zhao
出问题的Jobmanager日志如下,貌似是被隔离?? 然后重启该Jobmanager后就OK了。

2021-06-24 11:30:18,756 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]

2021-06-24 11:30:18,826 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]

2021-06-24 11:30:18,897 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]

2021-06-24 11:30:18,967 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]

2021-06-24 11:31:19,059 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]

2021-06-24 11:31:19,135 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]


Re: PyFlink kafka producer topic override

2021-06-23 Thread Dian Fu
You are right that split is still not supported. Does it make sense for you to 
split the stream using a filter function? There is some overhead compared the 
built-in stream.split as you need to provide a filter function for each 
sub-stream and so a record will evaluated multiple times.

> 2021年6月24日 上午3:08,Curt Buechter  写道:
> 
> Hi,
> New PyFlink user here. Loving it so far. The first major problem I've run 
> into is that I cannot create a Kafka Producer with dynamic topics. I see that 
> this has been available for quite some time in Java with Keyed Serialization 
> using the getTargetTopic method. Another way to do this in Java may be with 
> stream.split(), and adding a different sink for the split streams. Stream 
> splitting is also not available in PyFlink.
> Am I missing anything? Has anyone implemented this before in PyFlink, or know 
> of a way to make it happen?
> The use case here is that I'm using a CDC Debezium connector to populate 
> kafka topics from a multi-tenant database, and I'm trying to use PyFlink to 
> split the records into a different topic for each tenant.
> 
> Thanks



Re: Flink Kubernetes HA

2021-06-23 Thread Yang Wang
>From the implementation of DefaultCompletedCheckpointStore, Flink will only
retain the configured amount of checkpoints.

Maybe you could also check the content of jobmanager-leader ConfigMap. It
should have the same number of pointers for the completedCheckpoint.


Best,
Yang

Ivan Yang  于2021年6月24日周四 上午2:25写道:

> Thanks for the reply. Yes, We are seeing all the completedCheckpoint
> and they keep growing. We will revisit our k8s set up, configmap etc
>
> On Jun 23, 2021, at 2:09 AM, Yang Wang  wrote:
>
> Hi Ivan,
>
> For completedCheckpoint files will keep growing, do you mean too many
> files exist in the S3 bucket?
>
> AFAIK, if the K8s HA services work normally, only
> one completedCheckpoint file will be retained. Once a
> new one is generated, the old one will be deleted.
>
>
> Best,
> Yang
>
> Ivan Yang  于2021年6月23日周三 上午12:31写道:
>
>> Hi Dear Flink users,
>>
>> We recently implemented enabled the zookeeper less HA in our kubernetes
>> Flink deployment. The set up has
>>
>> high-availability.storageDir: s3://some-bucket/recovery
>>
>>
>> Since we have a retention policy on the s3 bucket, relatively short 7
>> days. So the HA will fail if the submittedJobGraph
>> 
>> xx is deleted by s3. If we remove the retention policy,
>> completedCheckpoint
>> 
>> files will keep growing. The only way I can think of is to use a
>> patterned based file retention policy in s3. Before I do that, Is there any
>> config keys available in Flink I can tune to not keep the all the
>> completeCheckpoint* in HA?
>>
>> Thanks,
>> Ivan
>>
>>
>>
>>
>>
>
>


Re: multiple jobs in same flink app

2021-06-23 Thread Yang Wang
Robert is right. We Could only support single job submission in application
mode when the HA mode is enabled.

This is a known limitation of current application mode implementation.

Best,
Yang

Robert Metzger  于2021年6月24日周四 上午3:54写道:

> Thanks a lot for checking again. I just started Flink in Application mode
> with a jar that contains two "executeAsync" submissions, and indeed two
> jobs are running.
>
> I think the problem in your case is that you are using High Availability
> (I guess, because there are log statements from the
> ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]:
>
> The Application Mode allows for multi-execute() applications but
>> High-Availability is not supported in these cases. High-Availability in
>> Application Mode is only supported for single-execute() applications.
>
>
> See also: https://issues.apache.org/jira/browse/FLINK-19358
>
> Sorry again that I gave you invalid information in my first answer.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/
>
>
>
>
> On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang  wrote:
>
>> Hi Robert,
>>
>> But I saw Flink doc shows application mode can run multiple jobs? Or I
>> misunderstand it?
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/
>>
>>
>>
>> *Compared to the Per-Job mode, the Application Mode allows the submission of 
>> applications consisting of multiple jobs. The order of job execution is not 
>> affected by the deployment mode but by the call used to launch the job. 
>> Using execute(), which is blocking, establishes an order and it will lead to 
>> the execution of the "next" job being postponed until "this" job finishes. 
>> Using executeAsync(), which is non-blocking, will lead to the "next" job 
>> starting before "this" job finishes.*
>>
>>
>> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger 
>> wrote:
>>
>>> Hi Qihua,
>>>
>>> Application Mode is meant for executing one job at a time, not multiple
>>> jobs on the same JobManager.
>>> If you want to do that, you need to use session mode, which allows
>>> managing multiple jobs on the same JobManager.
>>>
>>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang  wrote:
>>>
 Hi Arvid,

 Do you know if I can start multiple jobs for a single flink application?

 Thanks,
 Qihua


 On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang  wrote:

> Hi,
>
> I am using application mode.
>
> Thanks,
> Qihua
>
> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise  wrote:
>
>> Hi Qihua,
>>
>> Which execution mode are you using?
>>
>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:
>>
>>> Hi,
>>>
>>> Thank you for your reply. What I want is flink app has multiple
>>> jobs, each job manage a stream. Currently our flink app has only 1 job 
>>> that
>>> manage multiple streams.
>>> I did try env.executeAsyc(), but it still doesn't work. From the
>>> log, when the second executeAsync() was called, it shows " *Job
>>>  was recovered successfully.*"
>>> Looks like the second executeAsync() recover the first job. Not
>>> start a second job.
>>>
>>> Thanks,
>>> Qihua
>>>
>>>
>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise 
>>> wrote:
>>>
 Hi,

 env.execute("Job 1"); is a blocking call. You either have to use
 executeAsync or use a separate thread to submit the second job. If Job 
 1
 finishes then this would also work by having sequential execution.

 However, I think what you actually want to do is to use the same
 env with 2 topologies and 1 single execute like this.

 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();
 DataStream stream1 = env.addSource(new
 SourceFunction());
 stream1.addSink(new FlinkKafkaProducer());
 DataStream stream2 = env.addSource(new
 SourceFunction());
 stream2.addSink(new DiscardingSink<>());
 env.execute("Job 1+2");

 On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang 
 wrote:

> Hi,
> Does anyone know how to run multiple jobs in same flink
> application?
> I did a simple test.  First job was started. I did see the log
> message, but I didn't see the second job was started, even I saw the 
> log
> message.
>
> public void testJobs() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream stream1 = env.addSource(new
> SourceFunction());
> stream1.addSink(new FlinkKafkaProducer());
> printf("### first job");
> env.execute("Job 1");
>
> env = 

?????? Processing-time temporal join is not supported yet

2021-06-23 Thread op
watermarkeventtimewatermarkkey




----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-19830 


????: Processing-time temporal join is not supported yet

2021-06-23 Thread jiangshan0...@163.com
 Join  
?? Join ?? 
?? Join ?? Join  Join 
??  join 
??,
 ??

 
watermark



jiangshan0...@163.com
 
 op
?? 2021-06-23 17:03
 user-zh
?? Processing-time temporal join is not supported yet
hi??kakatemporal join??
org.apache.flink.table.api.TableException: Processing-time temporal join is not 
supported yet.
 
sql??
 
 
create view visioned_table as
 select
 user_id,
 event
from
(select
  user_id,
  event,
  row_number() over(partition by user_id order by event_time desc) 
as rn
  from kafka_table1
  )ta where rn=1;
select
   t1.*,t2.*
  from mvp_rtdwd_event_app_quit t1
join visioned_table FOR SYSTEM_TIME AS OF t1.proc_time AS t2
 on t1.user_id=t2.user_id
 where t1.user_id is not null


Re: multiple jobs in same flink app

2021-06-23 Thread Robert Metzger
Thanks a lot for checking again. I just started Flink in Application mode
with a jar that contains two "executeAsync" submissions, and indeed two
jobs are running.

I think the problem in your case is that you are using High Availability (I
guess, because there are log statements from the
ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]:

The Application Mode allows for multi-execute() applications but
> High-Availability is not supported in these cases. High-Availability in
> Application Mode is only supported for single-execute() applications.


See also: https://issues.apache.org/jira/browse/FLINK-19358

Sorry again that I gave you invalid information in my first answer.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/




On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang  wrote:

> Hi Robert,
>
> But I saw Flink doc shows application mode can run multiple jobs? Or I
> misunderstand it?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/
>
>
>
> *Compared to the Per-Job mode, the Application Mode allows the submission of 
> applications consisting of multiple jobs. The order of job execution is not 
> affected by the deployment mode but by the call used to launch the job. Using 
> execute(), which is blocking, establishes an order and it will lead to the 
> execution of the "next" job being postponed until "this" job finishes. Using 
> executeAsync(), which is non-blocking, will lead to the "next" job starting 
> before "this" job finishes.*
>
>
> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger 
> wrote:
>
>> Hi Qihua,
>>
>> Application Mode is meant for executing one job at a time, not multiple
>> jobs on the same JobManager.
>> If you want to do that, you need to use session mode, which allows
>> managing multiple jobs on the same JobManager.
>>
>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang  wrote:
>>
>>> Hi Arvid,
>>>
>>> Do you know if I can start multiple jobs for a single flink application?
>>>
>>> Thanks,
>>> Qihua
>>>
>>>
>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang  wrote:
>>>
 Hi,

 I am using application mode.

 Thanks,
 Qihua

 On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise  wrote:

> Hi Qihua,
>
> Which execution mode are you using?
>
> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:
>
>> Hi,
>>
>> Thank you for your reply. What I want is flink app has multiple jobs,
>> each job manage a stream. Currently our flink app has only 1 job that
>> manage multiple streams.
>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>> when the second executeAsync() was called, it shows " *Job
>>  was recovered successfully.*"
>> Looks like the second executeAsync() recover the first job. Not start
>> a second job.
>>
>> Thanks,
>> Qihua
>>
>>
>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:
>>
>>> Hi,
>>>
>>> env.execute("Job 1"); is a blocking call. You either have to use
>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>> finishes then this would also work by having sequential execution.
>>>
>>> However, I think what you actually want to do is to use the same env
>>> with 2 topologies and 1 single execute like this.
>>>
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream stream1 = env.addSource(new
>>> SourceFunction());
>>> stream1.addSink(new FlinkKafkaProducer());
>>> DataStream stream2 = env.addSource(new
>>> SourceFunction());
>>> stream2.addSink(new DiscardingSink<>());
>>> env.execute("Job 1+2");
>>>
>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang 
>>> wrote:
>>>
 Hi,
 Does anyone know how to run multiple jobs in same flink application?
 I did a simple test.  First job was started. I did see the log
 message, but I didn't see the second job was started, even I saw the 
 log
 message.

 public void testJobs() throws Exception {
 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();
 DataStream stream1 = env.addSource(new
 SourceFunction());
 stream1.addSink(new FlinkKafkaProducer());
 printf("### first job");
 env.execute("Job 1");

 env = StreamExecutionEnvironment.getExecutionEnvironment();
 DataStream stream2 = env.addSource(new
 SourceFunction());
 stream2.addSink(new DiscardingSink<>());
 printf("### second job");
 env.execute("Job 2");
 }

 Here is the log:
 ### first job
 INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
 Job  is submitted.

Re: How would Flink job react to change of partitions in Kafka topic?

2021-06-23 Thread Thomas Wang
Sounds good. Thanks.

Thomas

On Wed, Jun 23, 2021 at 11:59 AM Seth Wiesman  wrote:

> It will just work as long as you enable partition discovery.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#partition-discovery
>
> On Tue, Jun 22, 2021 at 1:32 PM Thomas Wang  wrote:
>
>> Hi,
>>
>> I'm wondering if anyone has changed the number of partitions of a source
>> Kafka topic.
>>
>> Let's say I have a Flink job read from a Kafka topic which used to have
>> 32 partitions. If I change the number of partitions of that topic to 64,
>> can the Flink job still guarantee the exactly-once semantics?
>>
>> Thanks.
>>
>> Thomas
>>
>


PyFlink kafka producer topic override

2021-06-23 Thread Curt Buechter
Hi,
New PyFlink user here. Loving it so far. The first major problem I've run
into is that I cannot create a Kafka Producer with dynamic topics. I see
that this has been available for quite some time in Java with Keyed
Serialization using the getTargetTopic method. Another way to do this in
Java may be with stream.split(), and adding a different sink for the split
streams. Stream splitting is also not available in PyFlink.
Am I missing anything? Has anyone implemented this before in PyFlink, or
know of a way to make it happen?
The use case here is that I'm using a CDC Debezium connector to populate
kafka topics from a multi-tenant database, and I'm trying to use PyFlink to
split the records into a different topic for each tenant.

Thanks


Re: How would Flink job react to change of partitions in Kafka topic?

2021-06-23 Thread Seth Wiesman
It will just work as long as you enable partition discovery.

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#partition-discovery

On Tue, Jun 22, 2021 at 1:32 PM Thomas Wang  wrote:

> Hi,
>
> I'm wondering if anyone has changed the number of partitions of a source
> Kafka topic.
>
> Let's say I have a Flink job read from a Kafka topic which used to have 32
> partitions. If I change the number of partitions of that topic to 64, can
> the Flink job still guarantee the exactly-once semantics?
>
> Thanks.
>
> Thomas
>


Re: Flink Kubernetes HA

2021-06-23 Thread Ivan Yang
Thanks for the reply. Yes, We are seeing all the completedCheckpoint and 
they keep growing. We will revisit our k8s set up, configmap etc

> On Jun 23, 2021, at 2:09 AM, Yang Wang  wrote:
> 
> Hi Ivan,
> 
> For completedCheckpoint files will keep growing, do you mean too many 
> files exist in the S3 bucket?
> 
> AFAIK, if the K8s HA services work normally, only one completedCheckpoint 
> file will be retained. Once a
> new one is generated, the old one will be deleted.
> 
> 
> Best,
> Yang
> 
> Ivan Yang mailto:ivanygy...@gmail.com>> 于2021年6月23日周三 
> 上午12:31写道:
> Hi Dear Flink users,
> 
> We recently implemented enabled the zookeeper less HA in our kubernetes Flink 
> deployment. The set up has
> high-availability.storageDir: s3://some-bucket/recovery <>
> 
> Since we have a retention policy on the s3 bucket, relatively short 7 days. 
> So the HA will fail if the submittedJobGraph 
> xx
>  is deleted by s3. If we remove the retention policy, completedCheckpoint 
> 
>  files will keep growing. The only way I can think of is to use a patterned 
> based file retention policy in s3. Before I do that, Is there any config keys 
> available in Flink I can tune to not keep the all the completeCheckpoint* in 
> HA?
> 
> Thanks,
> Ivan
> 
> 
>  



Re: Issues when using a file system as a plugin

2021-06-23 Thread Yaroslav Tkachenko
Hi Arvid,

I've created the following issue:
https://issues.apache.org/jira/browse/FLINK-23127

On Wed, Jun 23, 2021 at 2:09 AM Arvid Heise  wrote:

> This looks like a bug at first glance. Could you please open a ticket for
> that?
>
> If not, I'd do that tomorrow.
>
> On Wed, Jun 23, 2021 at 6:36 AM Yaroslav Tkachenko <
> yaroslav.tkache...@shopify.com> wrote:
>
>> Hi everyone,
>>
>> I need to add support for the GCS filesystem. I have a working example
>> where I add two JARs to the */opt/flink/lib*/ folder:
>>
>> - GCS Hadoop connector
>> - *Shaded* Hadoop using flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>>
>> Now I'm trying to follow the advice from
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/overview/#pluggable-file-systems
>> and use Plugins instead. I followed the recommendation from here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/.
>> Now I have two JARs in the* /opt/flink/plugins/hadoop-gcs/* folder:
>>
>> - GCS Hadoop connector
>> -* Non-shaded* Hadoop using hadoop-common-2.10.1.jar
>>
>> As I can see, shading is not required for plugins, so I want to make it
>> work with a simple non-shaded hadoop-common. However, JobManager fails with
>> the following exceptions:
>>
>>
>>
>> Caused by: org.apache.flink.runtime.client.JobInitializationException:
>> Could not start the JobMaster.
>> at
>> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
>> Source) ~[?:?]
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.CompletableFuture.postComplete(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> ~[?:?]
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> ~[?:?]
>> at java.lang.Thread.run(Unknown Source) ~[?:?]
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint
>> storage at checkpoint coordinator side.
>> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.CompletableFuture.completeThrowable(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> ~[?:?]
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> ~[?:?]
>> at java.lang.Thread.run(Unknown Source) ~[?:?]
>> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create
>> checkpoint storage at checkpoint coordinator side.
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:324)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:240)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:448)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:311)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:120)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> 

Re: High Flink checkpoint Size

2021-06-23 Thread Vijayendra Yadav
Thanks Yun. Let me try options provided below.

Thanks,
Vijay

> On Jun 23, 2021, at 4:51 AM, Yun Tang  wrote:
> 
> 
> Hi Vijay,
> 
> To be honest, an 18MB checkpoint size in total might not be something 
> serious. If you really want to dig what inside, you could use 
> Checkpoints#loadCheckpointMetadata [1] to load the _metadata to see anything 
> unexpected.
> 
> And you could refer to FlinkKafkaConsumerBase#unionOffsetStates [2] and 
> FlinkKinesisConsumer#sequenceNumsToRestore to compare different operator 
> state stored in kafka and kinesis connector.
> 
> [1] 
> https://github.com/apache/flink/blob/10146366bec7feca85acedb23184b99517059bc6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99
> [2] 
> https://github.com/apache/flink/blob/10146366bec7feca85acedb23184b99517059bc6/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L201
> [3] 
> https://github.com/apache/flink/blob/10146366bec7feca85acedb23184b99517059bc6/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L158-L159
> 
> Best,
> Yun Tang
> From: Vijayendra Yadav 
> Sent: Wednesday, June 23, 2021 11:02
> To: user 
> Subject: High Flink checkpoint Size
>  
> Hi Team,
> 
> I have two flink Streaming Jobs 
> 1) Flink streaming from KAFKA and writing to s3
> 2) Fling Streaming from KINESIS (KDS) and writing to s3
> 
> Both Jobs have similar checkpoint duration. 
> 
> Job #1 (KAFKA) checkpoint size is only 85KB 
> Job #2 (KINESIS) checkpoint size is 18MB
> 
> There are no checkpoint failures. But I want to understand why Kinesis 
> streaming has such a huge checkpoint size, is there a way to handle it 
> differently? and reduce the size. 
> 
> Thanks,
> Vijay


Re: "Legacy Source Thread" line in logs

2021-06-23 Thread Fabian Paul
Hi Debraj,

By Source Legacy Thread we refer to all sources which do not implement the new 
interface yet [1]. Currently only the Hive, Kafka and FileSource
are already migrated. In general, there is no sever downside of using the older 
source but in the future we plan only to implement ones based on 
the new operator model.

Best,
Fabian

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface



"Legacy Source Thread" line in logs

2021-06-23 Thread Debraj Manna
Hi

I am seeing the below logs in flink 1.13.0 running in YARN

2021-06-23T13:41:45.761Z WARN grid.task.MetricSdmStalenessUtils Legacy
Source Thread - Source: MetricSource -> Filter -> MetricStoreMapper ->
(Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink:
FlinkKafkaProducer11, Sink: TSDBSink14) (1/1)#0
updateMetricStalenessInHisto:32 Received a non-positive staleness = -194239
at 1624455705761

Can someone let me know what does the "Legacy Source Thread" denotes?

I saw the same question here

in the deprecated mailing list with no answer. So starting a new email
thread here.


Re: High Flink checkpoint Size

2021-06-23 Thread Yun Tang
Hi Vijay,

To be honest, an 18MB checkpoint size in total might not be something serious. 
If you really want to dig what inside, you could use 
Checkpoints#loadCheckpointMetadata [1] to load the _metadata to see anything 
unexpected.

And you could refer to FlinkKafkaConsumerBase#unionOffsetStates [2] and 
FlinkKinesisConsumer#sequenceNumsToRestore to compare different operator state 
stored in kafka and kinesis connector.

[1] 
https://github.com/apache/flink/blob/10146366bec7feca85acedb23184b99517059bc6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99
[2] 
https://github.com/apache/flink/blob/10146366bec7feca85acedb23184b99517059bc6/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L201
[3] 
https://github.com/apache/flink/blob/10146366bec7feca85acedb23184b99517059bc6/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L158-L159

Best,
Yun Tang

From: Vijayendra Yadav 
Sent: Wednesday, June 23, 2021 11:02
To: user 
Subject: High Flink checkpoint Size

Hi Team,

I have two flink Streaming Jobs
1) Flink streaming from KAFKA and writing to s3
2) Fling Streaming from KINESIS (KDS) and writing to s3

Both Jobs have similar checkpoint duration.

Job #1 (KAFKA) checkpoint size is only 85KB
Job #2 (KINESIS) checkpoint size is 18MB

There are no checkpoint failures. But I want to understand why Kinesis 
streaming has such a huge checkpoint size, is there a way to handle it 
differently? and reduce the size.

Thanks,
Vijay


Re: Processing-time temporal join is not supported yet

2021-06-23 Thread Leonard Xu
会保留维表状态的,靠watermark清理过期数据。

祝好
Leonard


> 在 2021年6月23日,19:20,op <520075...@qq.com.INVALID> 写道:
> 
> 谢谢,Event time temporal join 
> 会保存temporal每个的key的最新状态吗,官网文档说跟两边watermark有关,每太看明白。。。
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年6月23日(星期三) 下午5:40
> 收件人:"user-zh" 
> 主题:Re: Processing-time temporal join is not supported yet
> 
> 
> 
> Hi,
> 
> Flink SQL 目前支持 Event time temporal join 任意表/视图,还不支持 Processing-time 
> temporal join 任意表/视图(支持Processing-time join 
> 实现了LookupTableSource的表)。
> 
> Processing-time temporal join 任意表目前不支持的原因主要是语义问题,具体来说: 
> 在Processing time关联时,Flink SQL 层面还没比较好的机制保证维表加载完后再关联。比如如用来做维表流的kafka中有 1000万 
> 条数据,但目前没有办法实现将这 
> 1000万条先记录全部加载完后主流过来的数据再去关联,在作业启动阶段,主流的数据预期能够关联上的数据可能因为维表还未加载完成而关联不上。
> 
> 可以参考下 https://issues.apache.org/jira/browse/FLINK-19830 
>  
> 祝好
> Leonard
> 
> 
> 
>  在 2021年6月23日,17:03,op <520075...@qq.com.INVALID 写道:
>  
>  Processing-time temporal join is not supported yet.



?????? Processing-time temporal join is not supported yet

2021-06-23 Thread op
??Event time temporal join 
??temporal??key??watermark??




----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-19830 


Re: Processing-time temporal join is not supported yet

2021-06-23 Thread Leonard Xu
Hi,

Flink SQL 目前支持 Event time  temporal join 任意表/视图,还不支持 Processing-time temporal  
join 任意表/视图(支持Processing-time  join 实现了LookupTableSource的表)。

Processing-time temporal  join 任意表目前不支持的原因主要是语义问题,具体来说:  在Processing 
time关联时,Flink SQL 层面还没比较好的机制保证维表加载完后再关联。比如如用来做维表流的kafka中有 1000万 条数据,但目前没有办法实现将这 
1000万条先记录全部加载完后主流过来的数据再去关联,在作业启动阶段,主流的数据预期能够关联上的数据可能因为维表还未加载完成而关联不上。

可以参考下 https://issues.apache.org/jira/browse/FLINK-19830 
 

祝好
Leonard



> 在 2021年6月23日,17:03,op <520075...@qq.com.INVALID> 写道:
> 
>  Processing-time temporal join is not supported yet.



Re: Issues when using a file system as a plugin

2021-06-23 Thread Arvid Heise
This looks like a bug at first glance. Could you please open a ticket for
that?

If not, I'd do that tomorrow.

On Wed, Jun 23, 2021 at 6:36 AM Yaroslav Tkachenko <
yaroslav.tkache...@shopify.com> wrote:

> Hi everyone,
>
> I need to add support for the GCS filesystem. I have a working example
> where I add two JARs to the */opt/flink/lib*/ folder:
>
> - GCS Hadoop connector
> - *Shaded* Hadoop using flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
>
> Now I'm trying to follow the advice from
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/overview/#pluggable-file-systems
> and use Plugins instead. I followed the recommendation from here
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/.
> Now I have two JARs in the* /opt/flink/plugins/hadoop-gcs/* folder:
>
> - GCS Hadoop connector
> -* Non-shaded* Hadoop using hadoop-common-2.10.1.jar
>
> As I can see, shading is not required for plugins, so I want to make it
> work with a simple non-shaded hadoop-common. However, JobManager fails with
> the following exceptions:
>
>
>
> Caused by: org.apache.flink.runtime.client.JobInitializationException:
> Could not start the JobMaster.
> at
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
> Source) ~[?:?]
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
> ~[?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint
> storage at checkpoint coordinator side.
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture.completeThrowable(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create
> checkpoint storage at checkpoint coordinator side.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:324)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:240)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:448)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:311)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:120)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> 

Processing-time temporal join is not supported yet

2021-06-23 Thread op
hi??kakatemporal join??
org.apache.flink.table.api.TableException: Processing-time temporal join is not 
supported yet.

sql??


create view visioned_table as
 select
 user_id,
 event
from
(select
  user_id,
  event,
  row_number() over(partition by user_id order by event_time desc) 
as rn
  from kafka_table1
  )ta where rn=1;

select
   t1.*,t2.*
  from mvp_rtdwd_event_app_quit t1
join visioned_table FOR SYSTEM_TIME AS OF t1.proc_time AS t2
 on t1.user_id=t2.user_id
 where t1.user_id is not null

Re: Flink TPC-DS 3TB BenchMark result is not good.

2021-06-23 Thread Jingsong Li
Thanks Yingjie for pinging me.

Hi vtygoss,

Leonard is right, maybe you are using the wrong statistics information.

This caused the optimizer to select the **BROADCAST JOIN**
incorrectly. Unfortunately, Flink needs to broadcast a huge amount of data,
even gigabytes. This is really the performance killer.

I think you can:
- analyze your tables in Hive as Leonard said.
- Or just remove "TpcdsStatsProvider.registerTpcdsStats(tEnv)"

And I see your code: "table.optimizer.join.broadcast-threshold: 256 * 1024
* 1024".

I think this threshold is too large. More than 10MB is not recommended.

Best,
Jingsong

On Wed, Jun 23, 2021 at 11:08 AM Leonard Xu  wrote:

> Hi, vtygoss
>
> Thanks for the detail report,  a quick reply as I wrote the
> * org.apache.flink.table.tpcds.TpcdsTestProgram* in community, I guess
> you missed *table statistics information.*
>
> The* table statistics information* used in the TPC-DS e2e tests is
> constant for 1GB verification data set, I wrote this test for checking
> Flink Batch SQL works well for every PR as CI test rather than checking the
> performance. Please see
> *org.apache.flink.table.tpcds.stats.TpcdsStatsProvider*.
>
>  The *table statistics information *will be used by planner(CBO
> optimizer) to optimize the  sql plan, the incorrect  *table statistics
> information *even lead to the wrong plan and sql job may run unexpectedly.
>
> Thus if you want to run for 3TB TPC-DS tests, you should use the
> corresponding  *table statistics information *for your test data set, you
> can obtain the table statistics information by analyze your tables in Hive.
>
> Best,
> Leonard
>
> 在 2021年6月23日,10:42,Yingjie Cao  写道:
>
> Hi,
>
> I also have some experience of running TPC-DS benchmark with Flink (10T
> scale). But the data shuffle amount of Q1 has a really big difference with
> the numbers in the picture you shared. I am not sure what is going on,
> maybe you missed something? I attached the numbers of Q1 in my test (also
> with 500 max parallelism, though I used Flink version 1.13 instead of
> 1.12), the running time is 20s for 10T TPC-DS.
>
> There are some points I known which may influence the test results, hope
> these can help you:
> 1. Shuffle data compression. (Disabled in Flink by default, can be enabled
> by setting taskmanager.network.blocking-shuffle.compression.enabled to
> true);
> 2. Blocking shuffle type used. See [1] for more information. (To used
> sort-shuffle, the minimum version is 1.13);
> 3. Memory configuration, including network and managed memory size.
> 4. Sql optimization configuration. (I am not familiar with this,
> maybe @Jingsong Li has more knowledge about that).
>
> BTW, could you please share more information? For example, how many nodes
> in your cluster? Which type of disk do you use, SSD or HDD? How many
> available cores, disks and memory of each node? Could you also share the
> numbers of shuffle write and shuffle read of all stages of Spark?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
>
> 
>
> Best,
> Yingjie
>
> vtygoss  于2021年6月22日周二 下午4:46写道:
>
>> Hi,
>>
>> I am doing performance tests on 3TB TPC-DS using
>> flink/flink-end-to-end-tests/flink-tpcds-test module, but the test results
>> are not good.
>>
>> scenario:
>> tpc-ds location: hive 2.3.5
>> tpc-ds scala: 3TB, parquet + snappy
>> flink version: flink-1.12-SNAPSHOT
>> resource configuration: slots per task manager=5, parallesm=500, job
>> manager memory=10GB, task manager memory=10GB, task manager number=100.
>>
>> difference of my branch and community branch:
>> 1. tpc-ds stored in hive, so i change Source from CSV to Hive.
>> 2. I add some optimizations explicitly about  join-reorder,broadcast…
>> shown as below
>> 3. community  tpcds test main class is org.apache.flink.table.tpcds.
>> TpcdsTestProgram, my is org.apache.flink.table.tpcds.My
>> TpcdsTestProgram. Both are in
>> the attachment.
>>
>> Do I make something wrong? please help to offer some advices. thanks very
>> much!
>>
>> Best Regards
>>
>>
>> ```
>> [MyTpcdsTestProgram.java] (My Branch)
>>
>> private static TableEnvironment prepareTableEnv(String sourceTablePath, 
>> Boolean useTableStats) {
>> // init Table Env
>> EnvironmentSettings environmentSettings =
>> 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>>
>> TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
>>
>> tEnv.loadModule("custom-hive", new HiveModule("2.3.5"));
>> String hiveSite = "/home/work/flink-1.12/conf/";
>> HiveCatalog catalog = new HiveCatalog("hive", "tpcds3t", hiveSite);
>> tEnv.registerCatalog("hive", catalog);
>> tEnv.useCatalog("hive");
>> tEnv.useDatabase("tpcds3t");
>> // config Optimizer parameters
>> tEnv.getConfig()
>> .getConfiguration()
>> 
>> 

Re: multiple jobs in same flink app

2021-06-23 Thread Qihua Yang
Hi Robert,

But I saw Flink doc shows application mode can run multiple jobs? Or I
misunderstand it?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/



*Compared to the Per-Job mode, the Application Mode allows the
submission of applications consisting of multiple jobs. The order of
job execution is not affected by the deployment mode but by the call
used to launch the job. Using execute(), which is blocking,
establishes an order and it will lead to the execution of the "next"
job being postponed until "this" job finishes. Using executeAsync(),
which is non-blocking, will lead to the "next" job starting before
"this" job finishes.*


On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger  wrote:

> Hi Qihua,
>
> Application Mode is meant for executing one job at a time, not multiple
> jobs on the same JobManager.
> If you want to do that, you need to use session mode, which allows
> managing multiple jobs on the same JobManager.
>
> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang  wrote:
>
>> Hi Arvid,
>>
>> Do you know if I can start multiple jobs for a single flink application?
>>
>> Thanks,
>> Qihua
>>
>>
>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang  wrote:
>>
>>> Hi,
>>>
>>> I am using application mode.
>>>
>>> Thanks,
>>> Qihua
>>>
>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise  wrote:
>>>
 Hi Qihua,

 Which execution mode are you using?

 On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:

> Hi,
>
> Thank you for your reply. What I want is flink app has multiple jobs,
> each job manage a stream. Currently our flink app has only 1 job that
> manage multiple streams.
> I did try env.executeAsyc(), but it still doesn't work. From the log,
> when the second executeAsync() was called, it shows " *Job
>  was recovered successfully.*"
> Looks like the second executeAsync() recover the first job. Not start
> a second job.
>
> Thanks,
> Qihua
>
>
> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:
>
>> Hi,
>>
>> env.execute("Job 1"); is a blocking call. You either have to use
>> executeAsync or use a separate thread to submit the second job. If Job 1
>> finishes then this would also work by having sequential execution.
>>
>> However, I think what you actually want to do is to use the same env
>> with 2 topologies and 1 single execute like this.
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream1 = env.addSource(new
>> SourceFunction());
>> stream1.addSink(new FlinkKafkaProducer());
>> DataStream stream2 = env.addSource(new
>> SourceFunction());
>> stream2.addSink(new DiscardingSink<>());
>> env.execute("Job 1+2");
>>
>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:
>>
>>> Hi,
>>> Does anyone know how to run multiple jobs in same flink application?
>>> I did a simple test.  First job was started. I did see the log
>>> message, but I didn't see the second job was started, even I saw the log
>>> message.
>>>
>>> public void testJobs() throws Exception {
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream stream1 = env.addSource(new
>>> SourceFunction());
>>> stream1.addSink(new FlinkKafkaProducer());
>>> printf("### first job");
>>> env.execute("Job 1");
>>>
>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream stream2 = env.addSource(new
>>> SourceFunction());
>>> stream2.addSink(new DiscardingSink<>());
>>> printf("### second job");
>>> env.execute("Job 2");
>>> }
>>>
>>> Here is the log:
>>> ### first job
>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>> Job  is submitted.
>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>> Submitting Job with JobId=.
>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>> Received JobGraph submission  (job1).
>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>> Submitting job  (job1).
>>>
>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>> execution of job job1 () under job 
>>> master
>>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>> scheduling with scheduling strategy
>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>> INFO  

Re: PoJo to Avro Serialization throw KryoException: java.lang.UnsupportedOperationException

2021-06-23 Thread Yun Tang
Hi Rommel,

I wonder why avro type would use kryo as its serializer to serialize, could you 
check what kind of type information could get via TypeInformation.of(class) [1]


[1] 
https://github.com/apache/flink/blob/cc3f85eb4cd3e5031a84321e62d01b3009a00577/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java#L208


Best
Yun Tang

From: Rommel Holmes 
Sent: Wednesday, June 23, 2021 13:43
To: user 
Subject: PoJo to Avro Serialization throw KryoException: 
java.lang.UnsupportedOperationException

My Unit test was running OK under Flink 1.11.2 with parquet-avro 1.10.0, once I 
upgrade to 1.12.0 with parquet-avro 1.12.0, my unit test will throw

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
~[kryo-2.24.0.jar:?]
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
~[kryo-2.24.0.jar:?]
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
 ~[kryo-2.24.0.jar:?]
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) 
~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) 
~[kryo-2.24.0.jar:?]
...

aused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) 
~[?:1.8.0_282]
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
 ~[kryo-2.24.0.jar:?]
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
 ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) 
~[kryo-2.24.0.jar:?]
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
~[kryo-2.24.0.jar:?]
... 27 more
My Unit test code snippet is something like below:

private ImmutableList testData = ImmutableList.of(
PoJo.build("123", "0.0.0.0", null),
PoJo.build("123", "0.0.0.1", 2L)
);

DataStream input = env
.addSource(new TestSource(testData), 
PojoTypeInfo.of(PoJo.class))
.assignTimestampsAndWatermarks(watermarkStrategy);

DataStream output = input
.map(TestClass::convertPoJoToGenericRecord)
.returns(new GenericRecordAvroTypeInfo(PoJo.getAvroSchema()));

output.addSink();

The function is something like

GenericRecord convertPoJoToGenericRecord(PoJo pojo) throws Exception {
Schema schema = PoJo.getAvroSchema();
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
for (Schema.Field field : schema.getFields()) {
builder.set(field.name(), 
TestClass.getObjectField(field, pojo));
}
GenericRecord record = builder.build();
return record;
}

Can anyone help on this?

Thank you


Re: 中文教程更新不及时问题

2021-06-23 Thread Jark Wu
Hi Kevin,

欢迎来到 Apache Flink 开源社区!正如唐云所说,社区非常欢迎每一个贡献,也很珍惜每一份贡献。
但是中文文档的维护是一个非常庞大的工作,涉及到所有的模块,所以需要很多模块的 committer 的协作,
所以有时候难免会有更新不及时。

如果你有发现未翻译的页面且没有相关 JIRA issue,可以直接去创建 issue 并提交 PR。
如果已有相关 issue 和 PR,可以帮助 review,社区目前更缺高质量的 reviewer,这更能加速很多翻译的进度。

Best,
Jark

On Wed, 23 Jun 2021 at 11:04, Yun Tang  wrote:

> Hi Kevin,
>
> 欢迎来到Apache Flink开源社区!
>
> 因为开源社区的工作,一些参与者很多时候都是工作时间之外参与的,可能难免遇到进度更新不及时,或者长时间不再活跃的问题。
>
> 非常欢迎您在相关JIRA
> ticket下面评论和申请权限创建PR,社区一直都欢迎每一位贡献者,对于文档的维护尤其是中文文档的翻译也是非常需要的,如果有任何想要贡献的部分,欢迎直接去JIRA
> ticket下面、github PR下面评论,或者直接创建相关ticket。
>
> 祝好
> 唐云
> --
> *From:* pang fan 
> *Sent:* Monday, June 21, 2021 21:35
> *To:* user-zh@flink.apache.org 
> *Subject:* 中文教程更新不及时问题
>
> 大家好,
>
> 我是Flink的初学者,在跟着
>
> https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/try-flink/table_api/
>
> 官方教程(中文版)学习时发现很多中文教程都没有翻译掉,但是去看PR记录又发现很多都已经提了PR但是一直没有合并到主分支,里面很多PR都是几个月前的提的,后来好久都没有更新。
>
> 请问现在还有人在跟这些问题吗?如果有,可以更新下JIRA上的工单状态和代码PR状态,这样有需要我们也能申领工单给社区做一些贡献。
>
>
> 谢谢!
> Kevin Fan
>


Re: multiple jobs in same flink app

2021-06-23 Thread Robert Metzger
Hi Qihua,

Application Mode is meant for executing one job at a time, not multiple
jobs on the same JobManager.
If you want to do that, you need to use session mode, which allows managing
multiple jobs on the same JobManager.

On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang  wrote:

> Hi Arvid,
>
> Do you know if I can start multiple jobs for a single flink application?
>
> Thanks,
> Qihua
>
>
> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang  wrote:
>
>> Hi,
>>
>> I am using application mode.
>>
>> Thanks,
>> Qihua
>>
>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise  wrote:
>>
>>> Hi Qihua,
>>>
>>> Which execution mode are you using?
>>>
>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:
>>>
 Hi,

 Thank you for your reply. What I want is flink app has multiple jobs,
 each job manage a stream. Currently our flink app has only 1 job that
 manage multiple streams.
 I did try env.executeAsyc(), but it still doesn't work. From the log,
 when the second executeAsync() was called, it shows " *Job
  was recovered successfully.*"
 Looks like the second executeAsync() recover the first job. Not start a
 second job.

 Thanks,
 Qihua


 On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:

> Hi,
>
> env.execute("Job 1"); is a blocking call. You either have to use
> executeAsync or use a separate thread to submit the second job. If Job 1
> finishes then this would also work by having sequential execution.
>
> However, I think what you actually want to do is to use the same env
> with 2 topologies and 1 single execute like this.
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream stream1 = env.addSource(new
> SourceFunction());
> stream1.addSink(new FlinkKafkaProducer());
> DataStream stream2 = env.addSource(new
> SourceFunction());
> stream2.addSink(new DiscardingSink<>());
> env.execute("Job 1+2");
>
> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:
>
>> Hi,
>> Does anyone know how to run multiple jobs in same flink application?
>> I did a simple test.  First job was started. I did see the log
>> message, but I didn't see the second job was started, even I saw the log
>> message.
>>
>> public void testJobs() throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream1 = env.addSource(new
>> SourceFunction());
>> stream1.addSink(new FlinkKafkaProducer());
>> printf("### first job");
>> env.execute("Job 1");
>>
>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream2 = env.addSource(new
>> SourceFunction());
>> stream2.addSink(new DiscardingSink<>());
>> printf("### second job");
>> env.execute("Job 2");
>> }
>>
>> Here is the log:
>> ### first job
>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>> Job  is submitted.
>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>> Submitting Job with JobId=.
>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>> Received JobGraph submission  (job1).
>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>> Submitting job  (job1).
>>
>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>> execution of job job1 () under job master
>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>> scheduling with scheduling strategy
>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
>> job1 () switched from state CREATED to
>> RUNNING.
>>
>> ### second job
>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class -
>> IndexWriter : ### second job
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>> ResourceManager address, beginning registration
>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService
>> /leader//job_manager_lock.
>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>> .
>>