Is it possible to use OperatorState, when NOT implementing a source or sink function?

2021-06-04 Thread Marco Villalobos
Is it possible to use OperatorState, when NOT implementing a source or sink
function?

If yes, then how?


Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 Thread Jacob
thanks,

我查看了相关文档[1] 由于redis以及hbase的交互地方比较多,比较零散,不光是查询,还有回写redis

我打算把之前map算子的整段逻辑以线程池的形式丢在asyncInvoke()方法内部,不知道合适与否,这样数据的顺序性就无法得到保障了吧?



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/

  



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink sql regular join not working as expect.

2021-06-04 Thread 1095193...@qq.com
Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join. BTW, 
would we support read changelog for JDBC source when it works as right stream 
of a regular join in future?



1095193...@qq.com
 
From: JING ZHANG
Date: 2021-06-04 18:32
To: Yun Gao
CC: 1095193...@qq.com; user
Subject: Re: Flink sql regular join not working as expect.
Hi,
JDBC source only does a snapshot and sends all datas in the snapshot to 
downstream when it works as a right stream of a regular join, it could not 
produce a changlog stream.
After you update the field 'target'  from '56.32.15.55:8080' to 
'56.32.15.54:8080', JDBC source would not send new data to downstream.

You could try to use Upsert kafka [1] as right side of the regular join and set 
`source` as primary key.

BTW, if use Processing TIme Temporal Join[2] in your case, you could always 
join the latest version of dimension table, but updates on dimension table 
would not trigger join because it only waits for look up by keys.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins

Best regards,
JING ZHANG


Yun Gao  于2021年6月4日周五 下午5:07写道:
Hi, 

I'm not the expert for the table/sql, but it seems to me that for regular 
joins, Flink would not re-read the dimension 
table after it has read it fully for the first time. If you want to always join 
the records with the latest version of 
dimension table, you may need to use the temporal joins [1]. 

Best,
Yun


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins


--
From:1095193...@qq.com <1095193...@qq.com>
Send Time:2021 Jun. 4 (Fri.) 16:45
To:user 
Subject:Flink sql regular join not working as expect.

Hi
   I am working on joining a Kafka stream with a Postgres Dimension table.  
Accoring to: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
 "Regular joins are the most generic type of join in which any new record, or 
changes to either side of the join, are visible and affect the entirety of the 
join result."
   However, in my test, change record in dimenstion table will not affect the 
result of the join.  My test steps:
   1. create Kafka table sql 
  CREATE TABLE test1 (  source String )  WITH (  'connector' = 'kafka',   
'topic' = 'test' ...)
   2.create dimension table sql 
 CREATE TABLE test2 (source String, target String)  WITH  ( 'connector' 
= 'jdbc'... )
 Prepared 1 record in dimenion table:
 source  |   target
  172.16.1.109:8080   | 56.32.15.55:8080 
   3. regular join sql
   select test1.source, test2.target from test1 join test2 on test1.source 
= test2.source
   4. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink could output result as expect:  +I[172.16.1.109:8080, 
56.32.15.55:8080]
   5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080' in 
dimension table:  
  source  |   target
172.16.1.109:8080 56.32.15.54:8080
   6. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink still output result as not affected by changes to dimension table:  
+I[172.16.1.109:8080, 56.32.15.55:8080]
  Expect result:  +I[172.16.1.109:8080, 56.32.15.54:8080]
Could you give me some suggestions why regualar join result not be affected 
by changes to dimension table in mytest? Appreciation.



1095193...@qq.com



Elasticsearch sink connector timeout

2021-06-04 Thread Kai Fu
Hi team,

We encountered an issue about ES sink connector timeout quite frequently.
As checked the ES cluster is far from being loaded(~40% CPU utilization, no
query, index rate is also low). We're using ES-7 connector, with 12 data
nodes and parallelism of 32.

The error log is as below, we want to know if there is any way to locate
the issue or configure the timeout parameter.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/

*2021-06-05 11:49:10*
*java.lang.RuntimeException: An error occurred in ElasticsearchSink.*
*at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)*
*at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)*
*at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)*
*at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)*
*at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)*
*at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)*
*at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)*
*at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)*
*at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)*
*at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)*
*at
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)*
*at
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)*
*at
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)*
*at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)*
*at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)*
*at org.apache.flink.streaming.runtime.io
.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)*
*at org.apache.flink.streaming.runtime.io
.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)*
*at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)*
*at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)*
*at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*
*at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*
*at java.lang.Thread.run(Thread.java:748)*
*Caused by: java.net .SocketTimeoutException: 30,000
milliseconds timeout on connection http-outgoing-21 [ACTIVE]*
*at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)*
*at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)*
*at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)*
*at

Re: Flink exported metrics scope configuration

2021-06-04 Thread Kai Fu
Hi Mason,

Thank you for the advice, as I tried, it works and reduces a lot in size.

On Fri, Jun 4, 2021 at 11:45 AM Mason Chen  wrote:

> Hi Kai,
>
> You can use the excluded variables config for the reporter.
>
>- metrics.reporter..scope.variables.excludes: (optional) A
>semi-colon (;) separate list of variables that should be ignored by
>tag-based reporters (e.g., Prometheus, InfluxDB).
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/#reporter
>
> Best,
> Mason
>
> On Jun 3, 2021, at 9:31 PM, Kai Fu  wrote:
>
> Hi team,
>
> We noticed that Prometheus metrics exporter exports all of the metrics at
> the most fine-grained level, which is tremendous for the prometheus server
> especially when the parallelism is high. The metrics volume crawled from a
> single host(parallelism 8) is around 40MB for us currently. This is due to 
> *task_name
> *attribute in the metrics generated by the engine being very long. The
> task_name attribute is auto-generated from SQL job, and it seems it's
> attaching all field names onto it.
>
> We want to reduce the metrics volume by either drop task_name or at some
> more coarse-grained level. But I cannot find any related documents about
> this, any advice on that?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#list-of-all-variables
>
> --
> *Best wishes,*
> *- Kai*
>
>
>

-- 
*Best wishes,*
*- Kai*


Re: Corrupted unaligned checkpoints in Flink 1.11.1

2021-06-04 Thread Alexander Filipchik
Looked through the logs and didn't see anything fishy that indicated an
exception during checkpointing.
To make it clearer, here is the timeline (we use unaligned checkpoints, and
state size around 300Gb):

T1: Job1 was running
T2: Job1 was savepointed, brought down and replaced with Job2.
T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled,
brought down and replaced by Job3 that was restored from extarnilized
checkpoint of Job2
T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled,
brought down and replaced by Job4 that was restored from extarnilized
checkpoint of Job3
T4: We realized that jobs were timing out to savepoint due to local disk
throttling. We provisioned disk with more throughput and IO. Job4 was
cancelled, Job4 was deployed and restored from externilized checkpoint of
Job3, but failed as it couldn't find some files in the folder that belongs
to the checkpoint of *Job1*
T5: We tried to redeploy and restore from checkpoints of Job3 and Job2, but
all the attempts failed on reading files from the *folder that belongs to
the checkpoint of Job1*

We checked the content of the folder containing checkpoints of Job1, and it
has files. Not sure what is pointing tho missing files and what could've
removed them.

Any way we can figure out what could've happened? Is there a tool that can
read the checkpoint and check whether it is valid?

Alex

On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik 
wrote:

> On the checkpoints -> what kind of issues should I check for? I was
> looking for metrics and it looks like they were reporting successful
> checkpoints. It looks like some files were removed in the shared folder,
> but I'm not sure how to check for what caused it.
>
> Savepoints were failing due to savepoint timeout timeout. Based on
> metrics, our attached disks were not fast enough (GCS regional disks are
> network disks and were throttled). The team cancelled the savepoint and
> just killed the kubernetes cluster. I assume some checkpoints were
> interrupted as the job triggers them one after another.
>
> Is there a known issue with termination during running checkpoint?
>
> Btw, we use the Flink Kube operator from Lyft.
>
> Alex
>
> On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler 
> wrote:
>
>> Is there anything in the Flink logs indicating issues with writing the
>> checkpoint data?
>> When the savepoint could not be created, was anything logged from Flink?
>> How did you shut down the cluster?
>>
>> On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
>>
>> Hi,
>>
>> Trying to figure out what happened with our Flink job. We use flink
>> 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The
>> whole state is around 300Gb judging by the size of savepoints.
>>
>> The job ran ok. At some point we tried to deploy new code, but we
>> couldn't take a save point as they were timing out. It looks like the
>> reason it was timing out was due to disk throttle (we use google regional
>> disks).
>> The new code was deployed using an externalized checkpoint, but it didn't
>> start as job was failing with:
>>
>> Caused by: java.io.FileNotFoundException: Item not found:
>> 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'.
>> Note, it is possible that the live version is still available but the
>> requested generation is deleted.
>> at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions
>> .createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
>> at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(
>> GoogleCloudStorageImpl.java:653)
>> at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(
>> GoogleCloudStorageFileSystem.java:277)
>> at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.(
>> GoogleHadoopFSInputStream.java:78)
>> at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(
>> GoogleHadoopFileSystemBase.java:620)
>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>> at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
>> .java:120)
>> at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
>> .java:37)
>> at org.apache.flink.core.fs.
>> PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(
>> PluginFileSystemFactory.java:127)
>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(
>> SafetyNetWrapperFileSystem.java:85)
>> at org.apache.flink.runtime.state.filesystem.FileStateHandle
>> .openInputStream(FileStateHandle.java:69)
>> at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
>> .downloadDataForStateHandle(RocksDBStateDownloader.java:126)
>> at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
>> .lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>> at org.apache.flink.util.function.ThrowingRunnable
>> .lambda$unchecked$0(ThrowingRunnable.java:50)
>> at 

Question about State TTL and Interval Join

2021-06-04 Thread McBride, Chris
We currently have a flink 1.8 application deployed on Kinesis Data Analytics 
using the RocksDB State backend. Our application is joining across 3 different 
kinesis streams using an interval join. We noticed that our checkpoint sizes 
continue to increase over time, we eventually have OOM failures writing 
checkpoints and need to restart the application without restoring from a 
savepoint.

Does this kind of application require a state TTL on the join operator? I 
assumed since it was an interval join, events that fell outside of the lower 
timebound would automatically be expired from the state. Is that a correct 
assumption?

Thanks,
Chris



退订

2021-06-04 Thread happiless
退订


发自我的iPhone

Re: Add control mode for flink

2021-06-04 Thread Peter Huang
I agree with Steven. This logic can be added in a dynamic config framework
that can bind into Flink operators. We probably don't need to let Flink
runtime handle it.

On Fri, Jun 4, 2021 at 8:11 AM Steven Wu  wrote:

> I am not sure if we should solve this problem in Flink. This is more like
> a dynamic config problem that probably should be solved by some
> configuration framework. Here is one post from google search:
> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>
> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚  wrote:
>
>> Hi everyone,
>>
>>   Flink jobs are always long-running. When the job is running, users
>> may want to control the job but not stop it. The control reasons can be
>> different as following:
>>
>>1.
>>
>>Change data processing’ logic, such as filter condition.
>>2.
>>
>>Send trigger events to make the progress forward.
>>3.
>>
>>Define some tools to degrade the job, such as limit input qps,
>>sampling data.
>>4.
>>
>>Change log level to debug current problem.
>>
>>   The common way to do this is to stop the job, do modifications and
>> start the job. It may take a long time to recover. In some situations,
>> stopping jobs is intolerable, for example, the job is related to money or
>> important activities.So we need some technologies to control the running
>> job without stopping the job.
>>
>>
>> We propose to add control mode for flink. A control mode based on the
>> restful interface is first introduced. It works by these steps:
>>
>>
>>1. The user can predefine some logic which supports config control,
>>such as filter condition.
>>2. Run the job.
>>3. If the user wants to change the job's running logic, just send a
>>restful request with the responding config.
>>
>> Other control modes will also be considered in the future. More
>> introduction can refer to the doc
>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>> . If the community likes the proposal, more discussion is needed and a more
>> detailed design will be given later. Any suggestions and ideas are welcome.
>>
>>


Re: Failed to cancel a job using the STOP rest API

2021-06-04 Thread Thomas Wang
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this
specific job, there is only one source that is using FlinkKafkaConsumer
which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the
savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao  wrote:

> Hi Thomas,
>
> I think you are right that the CLI is also using the same rest API
> underlying, and since
> the response of the rest API is ok and the savepoint is triggered
> successfully, I reckon
> that it might not be due to rest API process, and we might still first
> focus on the
> stop-with-savepoint process.
>
> Currently stop-with-savepoint would first do a savepoint, then cancel all
> the sources to
> stop the job. Thus are the sources all legacy source (namely the one using
> SourceFunction) ?
> and does the source implement the cancel() method correctly ?
>
> Best,
> Yun
>
> --
> From:Thomas Wang 
> Send Time:2021 Jun. 4 (Fri.) 12:55
> To:user 
> Subject:Failed to cancel a job using the STOP rest API
>
> Hi, Flink community,
>
> I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing
> some inconsistent results. Sometimes, jobs could be cancelled successfully
> while other times, they couldn't. Either way, the POST request is accepted
> with a status code 202 and a "request-id".
>
> From the Flink UI, I can see the savepoint being completed successfully.
> However the job is still in running state afterwards. The CLI command
> `flink stop ` is working ok. I can use the CLI to stop the job and
> get the resulting savepoint location. If I understand this correctly, the
> CLI should be using the same REST API behind the scenes, isn't it?
>
> Here is my POST request URL: `http://
> .ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.
>
> Here is the BODY of the request:
> `{"drain":false,"targetDirectory":"s3:///flink-savepoint"}`.
>
> I'm using Flink 1.11.2 Commit ID: DeadD0d0.
>
> Any suggestions on how I can debug this?
>
> Another question is, given the response "request-id", which endpoint
> should I query to get the status of the request? Most importantly, where
> can I get the expected savepoint location?
>
> Thanks.
>
> Thomas
>
>
>


Re:Re: open checkpoint, send message to kafka OutOfOrderSequenceException

2021-06-04 Thread SmileSmile






Hi,


 after failover still OutOfOrderSequenceException. when I close checkpoint, 
kafka broker still return OutOfOrderSequenceException to me .  







At 2021-06-04 17:52:22, "Yun Gao"  wrote:

Hi,



Have you checked if the error during normal execution, or right after failover?


Best,
Yun


--
From:SmileSmile 
Send Time:2021 Jun. 4 (Fri.) 11:07
To:user 
Subject:open checkpoint, send message to kafka OutOfOrderSequenceException


Dear all:
  flink version is 1.12.4,kafka version is 1.1.1。topology is very 
simple ,source-->flatmap--->sink ,enable checkpoint,job will fail after a few 
hours 。 the error message is 


Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: The broker received an out of order sequence 
number.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
 
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
 
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
 


product's properties is simle ,  using defalut AT_LEAST_ONCE semantic
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5");

properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
"1");
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "2000");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "524288");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100");
properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
"6");

My questions is how to deal with ?




 




Re: Add control mode for flink

2021-06-04 Thread Steven Wu
I am not sure if we should solve this problem in Flink. This is more like a
dynamic config problem that probably should be solved by some configuration
framework. Here is one post from google search:
https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚  wrote:

> Hi everyone,
>
>   Flink jobs are always long-running. When the job is running, users
> may want to control the job but not stop it. The control reasons can be
> different as following:
>
>1.
>
>Change data processing’ logic, such as filter condition.
>2.
>
>Send trigger events to make the progress forward.
>3.
>
>Define some tools to degrade the job, such as limit input qps,
>sampling data.
>4.
>
>Change log level to debug current problem.
>
>   The common way to do this is to stop the job, do modifications and
> start the job. It may take a long time to recover. In some situations,
> stopping jobs is intolerable, for example, the job is related to money or
> important activities.So we need some technologies to control the running
> job without stopping the job.
>
>
> We propose to add control mode for flink. A control mode based on the
> restful interface is first introduced. It works by these steps:
>
>
>1. The user can predefine some logic which supports config control,
>such as filter condition.
>2. Run the job.
>3. If the user wants to change the job's running logic, just send a
>restful request with the responding config.
>
> Other control modes will also be considered in the future. More
> introduction can refer to the doc
> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
> . If the community likes the proposal, more discussion is needed and a more
> detailed design will be given later. Any suggestions and ideas are welcome.
>
>


Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Yuval Itzchakov
Thanks Timo 

On Fri, Jun 4, 2021, 17:13 Timo Walther  wrote:

> Hi Yuval,
>
> I would recommend option 2. Because esp. when it comes to state you
> should be in control what is persisted. There is no guarantee that the
> ExternalSerializer will not change in the future. It is only meant for
> shipping data as the input of the next operator.
>
> I would recommend to write some little tool that traverses
> `table.getResolvedSchema().toSourceDataType().getLogicalType()`, limits
> the types that you allow in state (e.g. no structured types) and
> translates it to TypeInformation.
>
> Regards,
> Timo
>
>
> On 04.06.21 16:05, Yuval Itzchakov wrote:
> > Hi Timo,
> > Thank you for the response.
> >
> > The tables being created in reality are based on arbitrary SQL code such
> > that I don't know what the schema actually is to create the
> > TypeInformation "by hand" and pass it on to the DataStream API.
> >
> > This leaves me with option 1, which leads to another question: If I have
> > some state records stored in RocksDB from a current running job in a
> > previous Flink version (1.9.3), will changing the TypeInformation from
> > TypeInformation[Row] to the ExternalTypeInformation now break the
> > compatibility of the state currently stored and cause them to be losed
> > essentially? My guy feeling says yes unless some form of backwards
> > compatibility is going to be written specifically for the usecase.
> >
> >
> > On Fri, Jun 4, 2021, 16:33 Timo Walther  > > wrote:
> >
> > Hi Yuval,
> >
> >
> > TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge
> > between TypeInformation and DataType until TypeInformation is not
> > exposed through the Table API anymore.
> >
> > Beginning from Flink 1.13 the Table API is able to serialize the
> > records
> > to the first DataStream operator via toDataStream or
> toChangelogStream.
> > Internally, it uses
> > org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that.
> The
> > binary representation is using internal data structures and
> conversion
> > will be performed during serialization/deserialization:
> >
> > conversion -> internal -> conversion
> >
> > You have two possibilities:
> >
> > 1) You simply call `tableEnv.toDataStream(table).getType()` and pass
> > this type on to the next operator.
> >
> > 2) You define your own TypeInformation as you would usually do it in
> > DataStream API without Table API.
> >
> > We might serialize `Row`s with `RowSerializer` again in the near
> > future.
> > But for now we went with the most generic solution that supports
> > everything that can come out of Table API.
> >
> > Regards,
> > Timo
> >
> > On 04.06.21 15:12, Yuval Itzchakov wrote:
> >  > When upgrading to Flink 1.13, I ran into deprecation warnings on
> >  > TypeConversions
> >  >
> >  > image.png
> >  >
> >  > The deprecation message states that this API will be deprecated
> > soon,
> >  > but does not mention the alternatives that can be used for these
> >  > transformations.
> >  >
> >  > My use case is that I have a table that needs to be converted
> into a
> >  > DataStream[Row] and in turn I need to apply some stateful
> >  > transformations on it. In order to do that I need the
> >  > TypeInformation[Row] produced in order to pass into the various
> > state
> >  > functions.
> >  >
> >  > @Timo Walther  > > I would love your help on this.
> >  > --
> >  > Best Regards,
> >  > Yuval Itzchakov.
> >
>
>


Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Timo Walther

Hi Yuval,

I would recommend option 2. Because esp. when it comes to state you 
should be in control what is persisted. There is no guarantee that the 
ExternalSerializer will not change in the future. It is only meant for 
shipping data as the input of the next operator.


I would recommend to write some little tool that traverses 
`table.getResolvedSchema().toSourceDataType().getLogicalType()`, limits 
the types that you allow in state (e.g. no structured types) and 
translates it to TypeInformation.


Regards,
Timo


On 04.06.21 16:05, Yuval Itzchakov wrote:

Hi Timo,
Thank you for the response.

The tables being created in reality are based on arbitrary SQL code such 
that I don't know what the schema actually is to create the 
TypeInformation "by hand" and pass it on to the DataStream API.


This leaves me with option 1, which leads to another question: If I have 
some state records stored in RocksDB from a current running job in a 
previous Flink version (1.9.3), will changing the TypeInformation from 
TypeInformation[Row] to the ExternalTypeInformation now break the 
compatibility of the state currently stored and cause them to be losed 
essentially? My guy feeling says yes unless some form of backwards 
compatibility is going to be written specifically for the usecase.



On Fri, Jun 4, 2021, 16:33 Timo Walther > wrote:


Hi Yuval,


TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge
between TypeInformation and DataType until TypeInformation is not
exposed through the Table API anymore.

Beginning from Flink 1.13 the Table API is able to serialize the
records
to the first DataStream operator via toDataStream or toChangelogStream.
Internally, it uses
org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that. The
binary representation is using internal data structures and conversion
will be performed during serialization/deserialization:

conversion -> internal -> conversion

You have two possibilities:

1) You simply call `tableEnv.toDataStream(table).getType()` and pass
this type on to the next operator.

2) You define your own TypeInformation as you would usually do it in
DataStream API without Table API.

We might serialize `Row`s with `RowSerializer` again in the near
future.
But for now we went with the most generic solution that supports
everything that can come out of Table API.

Regards,
Timo

On 04.06.21 15:12, Yuval Itzchakov wrote:
 > When upgrading to Flink 1.13, I ran into deprecation warnings on
 > TypeConversions
 >
 > image.png
 >
 > The deprecation message states that this API will be deprecated
soon,
 > but does not mention the alternatives that can be used for these
 > transformations.
 >
 > My use case is that I have a table that needs to be converted into a
 > DataStream[Row] and in turn I need to apply some stateful
 > transformations on it. In order to do that I need the
 > TypeInformation[Row] produced in order to pass into the various
state
 > functions.
 >
 > @Timo Walther > I would love your help on this.
 > --
 > Best Regards,
 > Yuval Itzchakov.





Add control mode for flink

2021-06-04 Thread 刘建刚
Hi everyone,

  Flink jobs are always long-running. When the job is running, users
may want to control the job but not stop it. The control reasons can be
different as following:

   1.

   Change data processing’ logic, such as filter condition.
   2.

   Send trigger events to make the progress forward.
   3.

   Define some tools to degrade the job, such as limit input qps, sampling
   data.
   4.

   Change log level to debug current problem.

  The common way to do this is to stop the job, do modifications and
start the job. It may take a long time to recover. In some situations,
stopping jobs is intolerable, for example, the job is related to money or
important activities.So we need some technologies to control the running
job without stopping the job.


We propose to add control mode for flink. A control mode based on the
restful interface is first introduced. It works by these steps:


   1. The user can predefine some logic which supports config control, such
   as filter condition.
   2. Run the job.
   3. If the user wants to change the job's running logic, just send a
   restful request with the responding config.

Other control modes will also be considered in the future. More
introduction can refer to the doc
https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
. If the community likes the proposal, more discussion is needed and a more
detailed design will be given later. Any suggestions and ideas are welcome.


Re: Error with extracted type from custom partitioner key

2021-06-04 Thread Timo Walther

Hi Ken,

non-POJOs are serialized with Kryo. This might not give you optimal 
performance. You can register a custom Kryo serializer in 
ExecutionConfig to speed up the serialization.


Alternatively, you can implement `ResultTypeQueryable` provide a custom 
type information with a custom serializer.


I hope this helps. Otherwise can you share a little example how you 
would like to cann partitionCustom()?


Regards,
Timo

On 04.06.21 15:38, Ken Krugler wrote:

Hi all,

I'm using Flink 1.12 and a custom partitioner/partitioning key (batch 
mode, with a DataSet) to do a better job of distributing data to tasks. 
The classes look like:


public class MyPartitioner implements Partitioner
{
     ...
}

public class MyGroupingKey implements Comparable
{
     ...
}

This worked fine, but I noticed a warning logged by Flink 
about MyGroupingKey not having an empty constructor, and thus not being 
treated as a POJO.


I added that empty constructor, and then I got an error 
because partitionCustom() only works on a single field key.


So I changed MyGroupingKey to have a single field (a string), with 
transient cached values for the pieces of the key that I need while 
partitioning. Now I get an odd error:


java.lang.RuntimeException: Error while calling custom partitioner

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast 
to MyGroupingKey

         at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
         at 
org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)

         ... 19 more

So I've got two questions…

• Should I just get rid of the empty constructor, and have Flink treat 
it as a non-POJO? This seemed to be working fine.
• Is it a bug in Flink that the extracted field from the key is being 
used as the expected type for partitioning?


Thanks!

— Ken

--
Ken Krugler
http://www.scaleunlimited.com 
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch







Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Yuval Itzchakov
Hi Timo,
Thank you for the response.

The tables being created in reality are based on arbitrary SQL code such
that I don't know what the schema actually is to create the TypeInformation
"by hand" and pass it on to the DataStream API.

This leaves me with option 1, which leads to another question: If I have
some state records stored in RocksDB from a current running job in a
previous Flink version (1.9.3), will changing the TypeInformation from
TypeInformation[Row] to the ExternalTypeInformation now break the
compatibility of the state currently stored and cause them to be losed
essentially? My guy feeling says yes unless some form of backwards
compatibility is going to be written specifically for the usecase.


On Fri, Jun 4, 2021, 16:33 Timo Walther  wrote:

> Hi Yuval,
>
>
> TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge
> between TypeInformation and DataType until TypeInformation is not
> exposed through the Table API anymore.
>
> Beginning from Flink 1.13 the Table API is able to serialize the records
> to the first DataStream operator via toDataStream or toChangelogStream.
> Internally, it uses
> org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that. The
> binary representation is using internal data structures and conversion
> will be performed during serialization/deserialization:
>
> conversion -> internal -> conversion
>
> You have two possibilities:
>
> 1) You simply call `tableEnv.toDataStream(table).getType()` and pass
> this type on to the next operator.
>
> 2) You define your own TypeInformation as you would usually do it in
> DataStream API without Table API.
>
> We might serialize `Row`s with `RowSerializer` again in the near future.
> But for now we went with the most generic solution that supports
> everything that can come out of Table API.
>
> Regards,
> Timo
>
> On 04.06.21 15:12, Yuval Itzchakov wrote:
> > When upgrading to Flink 1.13, I ran into deprecation warnings on
> > TypeConversions
> >
> > image.png
> >
> > The deprecation message states that this API will be deprecated soon,
> > but does not mention the alternatives that can be used for these
> > transformations.
> >
> > My use case is that I have a table that needs to be converted into a
> > DataStream[Row] and in turn I need to apply some stateful
> > transformations on it. In order to do that I need the
> > TypeInformation[Row] produced in order to pass into the various state
> > functions.
> >
> > @Timo Walther  I would love your help on
> this.
> > --
> > Best Regards,
> > Yuval Itzchakov.
>
>


Error with extracted type from custom partitioner key

2021-06-04 Thread Ken Krugler
Hi all,

I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, 
with a DataSet) to do a better job of distributing data to tasks. The classes 
look like:

public class MyPartitioner implements Partitioner 
{
...
}

public class MyGroupingKey implements Comparable 
{
...
}

This worked fine, but I noticed a warning logged by Flink about MyGroupingKey 
not having an empty constructor, and thus not being treated as a POJO.

I added that empty constructor, and then I got an error because 
partitionCustom() only works on a single field key.

So I changed MyGroupingKey to have a single field (a string), with transient 
cached values for the pieces of the key that I need while partitioning. Now I 
get an odd error:

java.lang.RuntimeException: Error while calling custom partitioner

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
MyGroupingKey
at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
at 
org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
... 19 more

So I've got two questions…

• Should I just get rid of the empty constructor, and have Flink treat 
it as a non-POJO? This seemed to be working fine.
• Is it a bug in Flink that the extracted field from the key is being 
used as the expected type for partitioning?

Thanks!

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Timo Walther

Hi Yuval,


TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge 
between TypeInformation and DataType until TypeInformation is not 
exposed through the Table API anymore.


Beginning from Flink 1.13 the Table API is able to serialize the records 
to the first DataStream operator via toDataStream or toChangelogStream. 
Internally, it uses 
org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that. The 
binary representation is using internal data structures and conversion 
will be performed during serialization/deserialization:


conversion -> internal -> conversion

You have two possibilities:

1) You simply call `tableEnv.toDataStream(table).getType()` and pass 
this type on to the next operator.


2) You define your own TypeInformation as you would usually do it in 
DataStream API without Table API.


We might serialize `Row`s with `RowSerializer` again in the near future. 
But for now we went with the most generic solution that supports 
everything that can come out of Table API.


Regards,
Timo

On 04.06.21 15:12, Yuval Itzchakov wrote:
When upgrading to Flink 1.13, I ran into deprecation warnings on 
TypeConversions


image.png

The deprecation message states that this API will be deprecated soon, 
but does not mention the alternatives that can be used for these 
transformations.


My use case is that I have a table that needs to be converted into a 
DataStream[Row] and in turn I need to apply some stateful 
transformations on it. In order to do that I need the 
TypeInformation[Row] produced in order to pass into the various state 
functions.


@Timo Walther  I would love your help on this.
--
Best Regards,
Yuval Itzchakov.




Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

2021-06-04 Thread Yuval Itzchakov
When upgrading to Flink 1.13, I ran into deprecation warnings on
TypeConversions

[image: image.png]

The deprecation message states that this API will be deprecated soon, but
does not mention the alternatives that can be used for these
transformations.

My use case is that I have a table that needs to be converted into a
DataStream[Row] and in turn I need to apply some stateful transformations
on it. In order to do that I need the TypeInformation[Row] produced in
order to pass into the various state functions.

@Timo Walther  I would love your help on this.
-- 
Best Regards,
Yuval Itzchakov.


Re: ByteSerializationSchema in PyFlink

2021-06-04 Thread Wouter Zorgdrager
Hi Dian, all,

Thanks for your suggestion. Unfortunately, it does not seem to work. I get
the following exception:

Caused by: java.lang.NegativeArraySizeException: -2147183315
at
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
at
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

To be more precise, the messages in my Kafka topic are pickled Python
objects. Maybe that is the reason for the exception, I also tried
using Types.PICKLED_BYTE_ARRAY().get_java_type_info()
but I think that has the same serializer because I get the same exception.

Any suggestions? Thanks for your help!

Regards,
Wouter

On Fri, 4 Jun 2021 at 08:24, Dian Fu  wrote:

> Hi Wouter,
>
> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
> org.apache.flink.configuration.Configuration]) does not exist
>
>
> As the exception indicate, the constructor doesn’t exists.
>
>
>
> Could you try with the following:
>
> ```
> j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
> j_type_serializer=
>  
> j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
>
> j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
>  j_type_serializer)
>
> ```
>
> Regards,
> Dian
>
> 2021年6月3日 下午8:51,Wouter Zorgdrager  写道:
>
> Hi all,
>
> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to
> directly work with the bytes from and to Kafka because I want to
> serialize/deserialize in my Python code rather than the JVM environment.
> Therefore, I can't use the SimpleStringSchema for (de)serialization (the
> messages aren't strings anyways). I've tried to create a
> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
>
> class ByteSerializer(SerializationSchema, DeserializationSchema):
> def __init__(self, execution_environment):
> gate_way = get_gateway()
>
> j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
> Types.BYTE().get_java_type_info(),
> get_j_env_configuration(execution_environment),
> )
> SerializationSchema.__init__(self, 
> j_serialization_schema=j_byte_string_schema)
> DeserializationSchema.__init__(
> self, j_deserialization_schema=j_byte_string_schema
> )The ByteSerializer is used like this:
>
>
> return FlinkKafkaConsumer(
> ["client_request", "internal"],
> ByteSerializer(self.env._j_stream_execution_environment),
> {
> "bootstrap.servers": "localhost:9092",
> "auto.offset.reset": "latest",
> "group.id": str(uuid.uuid4()),
> },
> )
> However, this does not seem to work. I think the error is thrown in the JVM 
> environment, which makes it a bit hard to parse in my Python stack trace,
>
> but I think it boils down to this stacktrace part:
>
>
> answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat
>  java.base/java.lang.Thread.run(Thread.java:834)\\n'
> gateway_client = 
> target_id = None
> name = 
> 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'
>
> def get_return_value(answer, gateway_client, target_id=None, name=None):
> """Converts an answer received from the Java gateway into a Python 
> object.
>
> For example, string representation of integers are converted to Python
> 

Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 Thread yidan zhao
官方就有文档。其实本质就是一个异步操作假设1ms,那么同步操作的1s也就能1000个操作,qps太低了。异步的话可以大大提高qps。

Jacob <17691150...@163.com> 于2021年6月4日周五 下午5:58写道:
>
> 嗯嗯 你的描述是对的,job的执行过程大致就是如此
>
>
> 我明白你意思了
>
> 谢谢你提供的思路,我需要学习一下这个异步算子,之前从未接触过,不太清楚这具体是一个怎样的流程,请问你那边有相关的demo吗,或者该去具体去看哪部分的内容?
>
>
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-04 Thread smq
非常感谢,我明天测试一下,解决之后,我会把这个问题描述下





-- 原始邮件 --
发件人: r pp http://apache-flink.147419.n8.nabble.com/
 
 
 
  --
  Best,
  nbsp; pp



 --
 Best,
 pp



-- 
Best,
 pp

Re: Flink sql regular join not working as expect.

2021-06-04 Thread JING ZHANG
Hi,
JDBC source only does a snapshot and sends all datas in the snapshot to
downstream when it works as a right stream of a regular join, it could not
produce a changlog stream.
After you update the field 'target'  from '56.32.15.55:8080' to '
56.32.15.54:8080', JDBC source would not send new data to downstream.

You could try to use Upsert kafka [1] as right side of the regular join and
set `source` as primary key.

BTW, if use Processing TIme Temporal Join[2] in your case, you could always
join the latest version of dimension table, but updates on dimension table
would not trigger join because it only waits for look up by keys.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins

Best regards,
JING ZHANG


Yun Gao  于2021年6月4日周五 下午5:07写道:

> Hi,
>
> I'm not the expert for the table/sql, but it seems to me that for regular
> joins, Flink would not re-read the dimension
> table after it has read it fully for the first time. If you want to always
> join the records with the latest version of
> dimension table, you may need to use the temporal joins [1].
>
> Best,
> Yun
>
>
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>
>
> --
> From:1095193...@qq.com <1095193...@qq.com>
> Send Time:2021 Jun. 4 (Fri.) 16:45
> To:user 
> Subject:Flink sql regular join not working as expect.
>
> Hi
>I am working on joining a Kafka stream with a Postgres Dimension
> table.  Accoring to:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
>  *"**Regular joins are the most generic type of join in which any new
> record, or changes to either side of the join, are visible and affect the
> entirety of the join result."*
>However, in my test, change record in dimenstion table will not affect
> the result of the join.  My test steps:
>1. create Kafka table sql
>   CREATE TABLE test1 (  source String )  WITH (  'connector' =
> 'kafka',   'topic' = 'test' ...)
>2.create dimension table sql
>  CREATE TABLE test2 (source String, target String)  WITH  (
> 'connector' = 'jdbc'... )
>  Prepared 1 record in dimenion table:
>  source  |   target
>   172.16.1.109:8080   | 56.32.15.55:8080
>3. regular join sql
>select test1.source, test2.target from test1 join test2 on
> test1.source = test2.source
>4. feed data into Kafka
>   {"source":"172.16.1.109:8080"}
>   Flink could output result as expect:  +I[172.16.1.109:8080,
> 56.32.15.55:8080]
>5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080'
> in dimension table:
>   source  |   target
> 172.16.1.109:8080 56.32.15.54:8080
>6. feed data into Kafka
>   {"source":"172.16.1.109:8080"}
>   Flink still output result as not affected by changes to dimension
> table:  +I[172.16.1.109:8080, 56.32.15.55:8080]
>   Expect result:  +I[172.16.1.109:8080,
> 56.32.15.54:8080]
> Could you give me some suggestions why regualar join result not be
> affected by changes to dimension table in mytest? Appreciation.
>
> --
> 1095193...@qq.com
>
>
>


Re: flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-04 Thread r pp
嗨~
 我这边是  per-job on yarn 的mode

 我查看的 yarn 的container 的日志信息,一般在程序初始化的时候,就会生产日志名,日志路径,变成环境env,
 然后 形成config ,再启动 cluster。

而日志路径 是yarn 的配置模式取好的,之后 flink 的获取这个信息,用于web 展示。

所以,你可能需要定位 你的日志文件路径 是否有改变,我做test,改变日志名,flink-web 还是可以正常显示 ,但是改变路径,flink-web
就无法显示了

但是具体的差异可能无法细化了,所提供的信息太少
env 环境信息
2021-06-04 17:38:15,417 INFO org.apache.flink.runtime.entrypoint.
ClusterEntrypoint [] - -Dlog
.file=/yarn/container-logs/application_1622784975053_0013/container_1622784975053_0013_01_01/jobmanager.log
cluster 启动信息
2021-06-04 16:47:01,429 INFO org.apache.flink.runtime.entrypoint.
ClusterEntrypoint [] - YARN daemon is running as: hdfs Yarn client user
obtainer: hdfs

zilong xiao  于2021年6月3日周四 下午2:17写道:

> 1.10默认用的log4j1,1.12用log4j2
>
> smq <374060...@qq.com> 于2021年6月2日周三 下午3:26写道:
>
> >
> >
> 你的意思是在log4j.properties中的配置吗,我门在这个里边配置了生成日志文件的格式,是在安装节点里加的,不过这个应该不是在webui里显示的。奇怪的一点是我们组有别的程序是正常的,但是一部分在webUI不显示日志。我们目前是从1.10升级到1.12,这种情况在1.12出现的
> >
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: r pp  > 发送时间: 2021年6月2日 15:08
> > 收件人: user-zh  > 主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
> >
> >
> >
> > 嗨~ 你们有没有改日志文件的名字
> >
> > smq <374060...@qq.com 于2021年6月2日周三 下午12:24写道:
> >
> >  你这个解决了吗,我也遇到了同样的问题
> > 
> > 
> > 
> > 
> > 
> >  -- 原始邮件 --
> >  发件人: todd  >  发送时间: 2021年4月14日 19:11
> >  收件人: user-zh  >  主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
> > 
> > 
> > 
> >  yarn上只有.out,.error的日志信息,但是从flink web ui的log框,无法显示日志内容。
> > 
> > 
> > 
> >  --
> >  Sent from: http://apache-flink.147419.n8.nabble.com/
> >
> >
> >
> > --
> > Best,
> >  pp
>


-- 
Best,
  pp


Re: open checkpoint, send message to kafka OutOfOrderSequenceException

2021-06-04 Thread Yun Gao
Hi,

Have you checked if the error during normal execution, or right after failover?

Best,
Yun


--
From:SmileSmile 
Send Time:2021 Jun. 4 (Fri.) 11:07
To:user 
Subject:open checkpoint, send message to kafka OutOfOrderSequenceException

Dear all:
  flink version is 1.12.4,kafka version is 1.1.1。topology is very 
simple ,source-->flatmap--->sink ,enable checkpoint,job will fail after a few 
hours 。 the error message is 


Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: The broker received an out of order sequence 
number.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
 
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
 
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
 


product's properties is simle ,  using defalut AT_LEAST_ONCE semantic
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5");

properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
"1");
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "2000");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "524288");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100");
properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
"6");

My questions is how to deal with ?




Re: Failed to cancel a job using the STOP rest API

2021-06-04 Thread Yun Gao
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, 
and since
the response of the rest API is ok and the savepoint is triggered successfully, 
I reckon
that it might not be due to rest API process, and we might still first focus on 
the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the 
sources to 
stop the job. Thus are the sources all legacy source (namely the one using 
SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun


--
From:Thomas Wang 
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user 
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some 
inconsistent results. Sometimes, jobs could be cancelled successfully while 
other times, they couldn't. Either way, the POST request is accepted with a 
status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. 
However the job is still in running state afterwards. The CLI command `flink 
stop ` is working ok. I can use the CLI to stop the job and get the 
resulting savepoint location. If I understand this correctly, the CLI should be 
using the same REST API behind the scenes, isn't it?

Here is my POST request URL: 
`http://.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: 
`{"drain":false,"targetDirectory":"s3:///flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I 
query to get the status of the request? Most importantly, where can I get the 
expected savepoint location?

Thanks.

Thomas



Re: Flink sql regular join not working as expect.

2021-06-04 Thread Yun Gao
Hi, 

I'm not the expert for the table/sql, but it seems to me that for regular 
joins, Flink would not re-read the dimension 
table after it has read it fully for the first time. If you want to always join 
the records with the latest version of 
dimension table, you may need to use the temporal joins [1]. 

Best,
Yun


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins



--
From:1095193...@qq.com <1095193...@qq.com>
Send Time:2021 Jun. 4 (Fri.) 16:45
To:user 
Subject:Flink sql regular join not working as expect.

Hi
   I am working on joining a Kafka stream with a Postgres Dimension table.  
Accoring to: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
 "Regular joins are the most generic type of join in which any new record, or 
changes to either side of the join, are visible and affect the entirety of the 
join result."
   However, in my test, change record in dimenstion table will not affect the 
result of the join.  My test steps:
  1. create Kafka table sql 
  CREATE TABLE test1 (  source String )  WITH (  'connector' = 'kafka',   
'topic' = 'test' ...)
   2.create dimension table sql 
 CREATE TABLE test2 (source String, target String)  WITH  ( 'connector' 
= 'jdbc'... )
 Prepared 1 record in dimenion table:
 source  |   target
 172.16.1.109:8080   | 56.32.15.55:8080 
  3. regular join sql
 select test1.source, test2.target from test1 join test2 on test1.source = 
test2.source
   4. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink could output result as expect:  +I[172.16.1.109:8080, 
56.32.15.55:8080]
   5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080' in 
dimension table:  
  source  |   target
172.16.1.109:8080 56.32.15.54:8080
   6. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink still output result as not affected by changes to dimension table:  
+I[172.16.1.109:8080, 56.32.15.55:8080]
  Expect result:  +I[172.16.1.109:8080, 56.32.15.54:8080]
Could you give me some suggestions why regualar join result not be affected 
by changes to dimension table in mytest? Appreciation.



1095193...@qq.com



????

2021-06-04 Thread ????????


Flink sql regular join not working as expect.

2021-06-04 Thread 1095193...@qq.com
Hi
   I am working on joining a Kafka stream with a Postgres Dimension table.  
Accoring to: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
 "Regular joins are the most generic type of join in which any new record, or 
changes to either side of the join, are visible and affect the entirety of the 
join result."
   However, in my test, change record in dimenstion table will not affect the 
result of the join.  My test steps:
   1. create Kafka table sql 
  CREATE TABLE test1 (  source String )  WITH (  'connector' = 'kafka',   
'topic' = 'test' ...)
   2.create dimension table sql 
 CREATE TABLE test2 (source String, target String)  WITH  ( 'connector' 
= 'jdbc'... )
 Prepared 1 record in dimenion table:
 source  |   target
  172.16.1.109:8080   | 56.32.15.55:8080 
   3. regular join sql
   select test1.source, test2.target from test1 join test2 on test1.source 
= test2.source
   4. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink could output result as expect:  +I[172.16.1.109:8080, 
56.32.15.55:8080]
   5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080' in 
dimension table:  
  source  |   target
172.16.1.109:8080 56.32.15.54:8080
   6. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink still output result as not affected by changes to dimension table:  
+I[172.16.1.109:8080, 56.32.15.55:8080]
  Expect result:  +I[172.16.1.109:8080, 56.32.15.54:8080]
Could you give me some suggestions why regualar join result not be affected 
by changes to dimension table in mytest? Appreciation.



1095193...@qq.com


Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 Thread yidan zhao
我懂你意思,每个输入数据,经过redis、hbase等访问,以及相关调整(比如字段设置等),然后这个记录需要继续作为此算子的输出是吧。

我表达的是指你需要用异步访问redis、hbase方式,这个配合flink的异步算子去实现。所以你说的那个需求基于异步的是可以满足的。

Jacob <17691150...@163.com> 于2021年6月4日周五 下午3:21写道:
>
> @nobleyd 谢谢回复
>
> 你任务A中的redis和hbase是异步还是同步访问,--- 同步
>
> 你估计用的是对齐检查点是吧? ---是的
>
>
> 同步访问,是因为我们要及时生成新数据,换做异步就无法即时拿到最新的结果数据了
>
> 检查点我刚调整为非对齐方式了,从做完的十个checkpoint来看,state大小确实增加了,但速度尚未变快
>
>
> 消息量确实比较大,处理逻辑也较为复杂,处理逻辑算子的并行度我给了100,source并行度等于topic分区数
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.11 application模式 使用 k8s时如何指定拉起的taskmanager数目

2021-06-04 Thread Jun Zou
Hi,all:
  我使用flink 1.11.2 的application模式在k8s上提交作业,作业申请的tm数目和期望的不一致。

作业调用DataStream接口注册kafka source和HDFS
sink,中间操作使用sql,sql逻辑是map-only,kafka的分区数目为4
首先,我在yarn上提交同样类型的作业,指定如下参数:

> taskmanager.numberOfTaskSlots:1
>
 parallelism.default:4

产生了4个taskmanager

而在k8s上配置了如下参数:

> taskmanager.numberOfTaskSlots:1
>
parallelism.default:4

kubernetes.taskmanager.cpu:1

却只申请了一个taskmanager。

另外,我使用TopSpeedWindowing这个example,在k8s上提交jar作业能拉起正确的taskmanager数目


Re: ByteSerializationSchema in PyFlink

2021-06-04 Thread Dian Fu
Hi Wouter,

> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
> org.apache.flink.configuration.Configuration]) does not exist

As the exception indicate, the constructor doesn’t exists.



Could you try with the following:

```
j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
j_type_serializer= 
j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
j_byte_string_schema = 
gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
 j_type_serializer)
```

Regards,
Dian

> 2021年6月3日 下午8:51,Wouter Zorgdrager  写道:
> 
> Hi all,
> 
> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to 
> directly work with the bytes from and to Kafka because I want to 
> serialize/deserialize in my Python code rather than the JVM environment. 
> Therefore, I can't use the SimpleStringSchema for (de)serialization (the 
> messages aren't strings anyways). I've tried to create a 
> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
> 
> class ByteSerializer(SerializationSchema, DeserializationSchema):
> def __init__(self, execution_environment):
> gate_way = get_gateway()
> 
> j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
> Types.BYTE().get_java_type_info(),
> get_j_env_configuration(execution_environment),
> )
> SerializationSchema.__init__(self, 
> j_serialization_schema=j_byte_string_schema)
> DeserializationSchema.__init__(
> self, j_deserialization_schema=j_byte_string_schema
> )
> The ByteSerializer is used like this:
> 
> return FlinkKafkaConsumer(
> ["client_request", "internal"],
> ByteSerializer(self.env._j_stream_execution_environment),
> {
> "bootstrap.servers": "localhost:9092",
> "auto.offset.reset": "latest",
> "group.id ": str(uuid.uuid4()),
> },
> )
> 
> However, this does not seem to work. I think the error is thrown in the JVM 
> environment, which makes it a bit hard to parse in my Python stack trace,
> but I think it boils down to this stacktrace part:
> 
> answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat
>  java.base/java.lang.Thread.run(Thread.java:834)\\n'
> gateway_client = 
> target_id = None
> name = 
> 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'
> 
> def get_return_value(answer, gateway_client, target_id=None, name=None):
> """Converts an answer received from the Java gateway into a Python 
> object.
> 
> For example, string representation of integers are converted to Python
> integer, string representation of objects are converted to JavaObject
> instances, etc.
> 
> :param answer: the string returned by the Java gateway
> :param gateway_client: the gateway client used to communicate with 
> the Java
> Gateway. Only necessary if the answer is a reference (e.g., 
> object,
> list, map)
> :param target_id: the name of the object from which the answer comes 
> from
> (e.g., *object1* in `object1.hello()`). Optional.
> :param name: the name of the member from which the answer comes from
> (e.g., *hello* in `object1.hello()`). Optional.
> """
> if is_error(answer)[0]:
> if len(answer) > 1:
> type = answer[1]
> value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
> if answer[1] == REFERENCE_TYPE:
> raise Py4JJavaError(
> "An error occurred while calling {0}{1}{2}.\n".
> format(target_id, ".", name), value)
> else:
> >   raise Py4JError(
> "An error occurred while calling {0}{1}{2}. 
> Trace:\n{3}\n".
> format(target_id, ".", name, value))
> E   py4j.protocol.Py4JError: An error occurred while calling 
> None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.
>  Trace:
> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
> org.apache.flink.configuration.Configuration]) does not exist
> E at 
> 

Re: Flink stream processing issue

2021-06-04 Thread yidan zhao
Yes, if you use KeyedCoProcess, flink will ensure that.

Qihua Yang  于2021年6月4日周五 上午12:32写道:
>
> Sorry for the confusion Yes, I mean multiple parallelism. Really thanks 
> for your help.
>
> Thanks,
> Qihua
>
> On Thu, Jun 3, 2021 at 12:03 AM JING ZHANG  wrote:
>>
>> Hi Qihua,
>>
>> I’m sorry I didn’t understand what you mean by ‘replica’. Would you please 
>> explain a little more?
>> If you means you job has multiple parallelism, and whether same data from 
>> different two inputs would be send to the same downstream subtask after 
>> `keyedCoProcessFunction`.
>> Yes, Flink could do this, if you keyBy the same field for two inputs.
>>
>> Best regards,
>> JING ZHANG
>>
>> Qihua Yang  于2021年6月3日周四 下午12:25写道:
>>>
>>> Hi,
>>>
>>> I have a question. We have two data streams that may contain duplicate 
>>> data. We are using keyedCoProcessFunction to process stream data. I defined 
>>> the same keySelector for both streams. Our flink application has multiple 
>>> replicas. We want the same data to be processed by the same replica. Can 
>>> flink ensure that?
>>>
>>> Thanks,
>>> Qihua


Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 Thread yidan zhao
你任务A中的redis和hbase是异步还是同步访问,同步是肯定不行的。ckpt小是因为没啥状态,自然就小,时间长可能是数据对齐时间长,你估计用的是对齐检查点是吧?
 如果换成非对齐检查点,时间应该能降下来,但是状态会变得很大,你可以试试。
最佳做法是,改造成异步的,不能同步。

JasonLee <17610775...@163.com> 于2021年6月4日周五 上午10:57写道:
>
> hi
>
> source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask
> 空跑,浪费资源,你只需要把 map 的并行度调大即可.
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/