Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-09 Thread yujianbo
log4j可以,log4j2也可以,现在头疼已经实现打kafka,不知道怎么区分这两边的日志



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink1.13.1启动后web-ui查看taskmanager信息失败并报错,稳定复现。

2021-06-09 Thread yidan zhao
如题,今天从1.12升级到1.13.1,启动standalone集群后。找到web-ui,点taskmanagers,出现列表,然后点任意taskmanager进行查看信息。右上角弹提示Internal
server error。查看了该JM的日志,后面附,主要报错是 Caused by:
java.io.NotSerializableException:
org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots。

此外,经过排查,并不是访问任意JM的rest地址都会有此问题,目前初步规律是,直接访问leader的web-ui无问题,访问其他地址的web-ui有此问题。


报错时JM的错误日志:

2021-06-10 13:00:27,395 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

2021-06-10 13:00:27,399 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Preconfiguration:
2021-06-10 13:00:27,400 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -


RESOURCE_PARAMS extraction logs:
jvm_params: -Xmx2093796552 -Xms2093796552 -XX:MaxMetaspaceSize=536870912
dynamic_configs: -D jobmanager.memory.off-heap.size=268435456b -D
jobmanager.memory.jvm-overhead.min=322122552b -D
jobmanager.memory.jvm-metaspace.size=536870912b -D
jobmanager.memory.heap.size=2093796552b -D
jobmanager.memory.jvm-overhead.max=322122552b
logs: INFO  [] - Loading configuration property:
taskmanager.numberOfTaskSlots, 20
INFO  [] - Loading configuration property: cluster.evenly-spread-out-slots, true
INFO  [] - Loading configuration property: parallelism.default, 1
INFO  [] - Loading configuration property: jobmanager.memory.process.size, 3gb
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-metaspace.size, 512mb
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-overhead.fraction, 0.1
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-overhead.min, 192mb
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-overhead.max, 512mb
INFO  [] - Loading configuration property:
jobmanager.memory.off-heap.size, 256mb
INFO  [] - Loading configuration property: taskmanager.memory.process.size, 20gb
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-metaspace.size, 512mb
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-overhead.fraction, 0.1
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-overhead.min, 192mb
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-overhead.max, 512mb
INFO  [] - Loading configuration property: taskmanager.memory.segment-size, 32kb
INFO  [] - Loading configuration property:
taskmanager.memory.managed.fraction, 0.4
INFO  [] - Loading configuration property: taskmanager.memory.managed.size, 64mb
INFO  [] - Loading configuration property:
taskmanager.memory.network.fraction, 0.1
INFO  [] - Loading configuration property: taskmanager.memory.network.min, 1gb
INFO  [] - Loading configuration property: taskmanager.memory.network.max, 2gb
INFO  [] - Loading configuration property:
taskmanager.memory.framework.off-heap.size, 256mb
INFO  [] - Loading configuration property:
taskmanager.memory.task.off-heap.size, 512mb
INFO  [] - Loading configuration property:
taskmanager.memory.framework.heap.size, 256mb
INFO  [] - Loading configuration property: high-availability, zookeeper
INFO  [] - Loading configuration property:
high-availability.storageDir, bos://flink-bucket/flink/ha
INFO  [] - Loading configuration property:
high-availability.zookeeper.quorum,
bjhw-aisecurity-cassandra01.bjhw:9681,bjhw-aisecurity-cassandra02.bjhw:9681,bjhw-aisecurity-cassandra03.bjhw:9681,bjhw-aisecurity-cassandra04.bjhw:9681,bjhw-aisecurity-cassandra05.bjhw:9681
INFO  [] - Loading configuration property:
high-availability.zookeeper.path.root, /flink
INFO  [] - Loading configuration property:
high-availability.cluster-id, opera_upd_FlinkTestJob3
INFO  [] - Loading configuration property: web.checkpoints.history, 100
INFO  [] - Loading configuration property: state.checkpoints.num-retained, 100
INFO  [] - Loading configuration property: state.checkpoints.dir,
bos://flink-bucket/flink/default-checkpoints
INFO  [] - Loading configuration property: state.savepoints.dir,
bos://flink-bucket/flink/default-savepoints
INFO  [] - Loading configuration property:
jobmanager.execution.failover-strategy, region
INFO  [] - Loading configuration property: web.submit.enable, false
INFO  [] - Loading configuration property: jobmanager.archive.fs.dir,
bos://flink-bucket/flink/completed-jobs/opera_upd_FlinkTestJob3
INFO  [] - Loading configuration property:
historyserver.archive.fs.dir,
bos://flink-bucket/flink/completed-jobs/opera_upd_FlinkTestJob3
INFO  [] - Loading configuration property:
historyserver.archive.fs.refresh-interval, 1
INFO  [] - Loading configuration property: rest.port, 8600
INFO  [] - Loading configuration property: historyserver.web.port, 8700
INFO  [] - Loading configuration property:
high-availability.jobmanager.port, 9318
INFO  [] - Loading configuration property: blob.server.port, 9320
INFO  [] - Loading configuration property: taskmanager.rpc.port, 9319
INFO  [] - Loading configuration property: taskmanager.data.port, 9325
INFO  [] - Loading configuration property:

Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-09 Thread yidan zhao
log4j还可以打印到kafka吗。

yujianbo <15205029...@163.com> 于2021年6月10日周四 上午11:47写道:
>
> 版本:1.12
> 框架:用默认的log4j2框架
> 问题:日志打到kafka,如何去区分jobmanager和taskmanger日志?我发现去改layout.pattern,还是没有能找到区分的好办法?
>
>
> appender.kafka.layout.type=PatternLayout
> appender.kafka.layout.pattern=%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
> %m%n -- %t -- %F
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flinksql ttl不生效

2021-06-09 Thread HunterXHunter
建议关闭state.backend.incremental ,因为我遇到过,开启增量ckp导致ckp一直增大,关掉就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


FileSource may cause akka.pattern.AskTimeoutException, and akka.ask.timeout not workable

2021-06-09 Thread 陳樺威
Hello all,

Our team encounter *akka.pattern.AskTimeoutException *when start
jobmanager. Base on the error message, we try to setup *akka.ask.timeout *
and* web.timeout *to 360s, but both of them doesn't work.

We guess the issue may cause by *FileSource.forRecordFileFormat.* The
application will load files in batch mode to rebuild our historical data.
The job can run normally in small batch. But it will be broken when run
over lots of files. (around 3 files distributed in 1500 folders)

The flink application is on kubernetes in application mode and files stores
in Google Cloud Storage.

Our questions are,
1. How to enlarge akka.ask.timeout correctly in our case?
2. Is it cause by FileSource? If yes, could you provide some suggestions to
prevent it?


Following is our settings.
```
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: kubernetes.container.image, */:**.*.**
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: fs.hdfs.hadoopconfig, /opt/flink/conf
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 4
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: kubernetes.rest-service.exposed.type, ClusterIP
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: high-availability.jobmanager.port, 6123
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: akka.ask.timeout, 360s
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.backend.rocksdb.memory.write-buffer-ratio, 0.7
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: metrics.reporter.prom.class,
org.apache.flink.metrics.prometheus.PrometheusReporter
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.storage.fs.memory-threshold, 1048576
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.checkpointing.unaligned, true
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: web.timeout, 100
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.target, kubernetes-application
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: restart-strategy.fixed-delay.attempts, 2147483647
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.memory.process.size, 8g
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.rpc.port, 6122
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: akka.framesize, 104857600b
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: containerized.master.env.HADOOP_CLASSPATH,
/opt/flink/conf:/opt/hadoop-3.1.1/share/hadoop/common/lib/*:/opt/hadoop-3.1.1/share/hadoop/common/*:/opt/hadoop-3.1.1/share/hadoop/hdfs:/opt/hadoop-3.1.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.1/share/hadoop/hdfs/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/*:/opt/hadoop-3.1.1/share/hadoop/yarn:/opt/hadoop-3.1.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.1/share/hadoop/yarn/*:/contrib/capacity-scheduler/*.jar
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.attached, true
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: internal.cluster.execution-mode, NORMAL
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: high-availability,
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
2021-06-10 03:44:14,320 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property:
execution.checkpointing.externalized-checkpoint-retention,
DELETE_ON_CANCELLATION
2021-06-10 03:44:14,320 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration 

Re: flinksql ttl不生效

2021-06-09 Thread Lincoln Lee
你好,
sql 作业可以尝试设置作业参数 "table.exec.state.ttl" 观察下效果
另外开启 "state.backend.incremental" 也可以减少 checkpoint 的大小
参数说明:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#checkpoints-and-state-backends

lincoln lee


chenchencc <1353637...@qq.com> 于2021年6月8日周二 下午3:11写道:

> 版本:1.12.2
> sql:
> SELECT id, name, message,ts
> SELECT
> ROW_NUMBER() OVER (PARTITION BY name
> ORDER BY ts DESC) AS rowNum
> FROM persons_message_table_kafka
> WHERE rowNum = 1
> 过期时间设置:tableEnv.getConfig().setIdleStateRetention(Duration.ofhour(3));
>
> 问题:checkpoint数据一直在线上增加,一开始90m,然后每天增长20m,但是源数据并没有太多增长
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: JDBC异步lookup join有FLIP或者迭代计划吗?

2021-06-09 Thread Lin Li
社区之前有过基于 legacy source 的 pr
https://issues.apache.org/jira/browse/FLINK-14902, 不过目前没有进展, 欢迎贡献!
cc Guowei Ma


Luna Wong  于2021年6月10日周四 上午11:16写道:

> 如果没有我用VertX和Druid连接池贡献下代码。这个要在dev邮件列表里讨论是吗
>


How to gracefully handle job recovery failures

2021-06-09 Thread Li Peng
Hey folks, we have a cluster with HA mode enabled, and recently after doing
a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 2.12)
crashed and was stuck in a crash loop, with the following error:

2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error
occurred in the cluster entrypoint.
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Could not recover job with job
id .
at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover
job with job id .
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 common frames omitted
Caused by: org.apache.flink.util.FlinkException: Could not retrieve
submitted JobGraph from state handle under
/. This indicates that the retrieved state
handle is broken. Try cleaning the state handle store.
at
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
... 7 common frames omitted
Caused by: java.io.FileNotFoundException: No such file or directory:
s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
at
org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
at
org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
at
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
... 8 common frames omitted

We have an idea of why the file might be gone and are addressing it, but my
question is: *how can I configure this in such a way so that a missing job
file doesn't trap the cluster in a forever restart loop?* Is there some
setting to just treat this like a complete fresh deployment if the recovery
file is missing?

Thanks!
Li


Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-09 Thread yujianbo
版本:1.12
框架:用默认的log4j2框架
问题:日志打到kafka,如何去区分jobmanager和taskmanger日志?我发现去改layout.pattern,还是没有能找到区分的好办法?


appender.kafka.layout.type=PatternLayout
appender.kafka.layout.pattern=%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
%m%n -- %t -- %F



--
Sent from: http://apache-flink.147419.n8.nabble.com/


confused about `TO_TIMESTAMP` document description

2021-06-09 Thread Tony Wei
Hi Expert,

this document [1] said `TO_TIMESTAMP` will use the session time zone to
convert date time string into a timestamp.
If I understand correctly, when I set session time zone to `Asia/Shanghai`
and query `SELECT TO_TIMESTAMP('1970-01-01 08:00:00');`,
the result should be epoch timestamp `0` (i.e. '1970-01-01 08:00:00 UTC+8').

TO_TIMESTAMP(string1[, string2])

Converts date time string *string1* with format *string2* (by default:
'-MM-dd HH:mm:ss') under the session time zone (specified by
TableConfig) to a timestamp.

Only supported in blink planner.
However, I found that result is not same as I expected. I tested it by
running the below query under the `Asia/Shanghai` timezone:

SELECT
> CAST(TO_TIMESTAMP(FROM_UNIXTIME(0)) AS BIGINT),

FROM_UNIXTIME(0),

TO_TIMESTAMP(FROM_UNIXTIME(0));


and I got the result like

  EXPR$0EXPR$1EXPR$2
> 28800   1970-01-01 08:00:00  1970-01-01T08:00


The `FROM_UNIXTIME` did convert the epoch timestamp to string format based
on session time zone, but `FROM_UNIXTIME` didn't.
Therefore, I got the `28800` when I cast timestamp into bigint. The result
actually shift 8 hours.

I found this code snippet [2] might be related to `TO_TIMESTAMP` udf, and
seems like it won't set use any timezone configuration, so maybe the
document might be wrong.

Please correct me if I misunderstood something. Thank you.

best regards,

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#temporal-functions
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java#L322:L377


Re: sink端处理数据很慢

2021-06-09 Thread LakeShen
看下最近是不是流量变大了,以及看下 Sink 的外部存储的集群压力是不是很大。

Best,
LakeShen

田磊  于2021年6月10日周四 上午11:36写道:

> 好的,谢谢,我看看。
>
>
> | |
> totorobabyfans
> |
> |
> 邮箱:totorobabyf...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2021年06月10日 10:50,Lin Li 写道:
> 你好,如果之前一直运行正常,建议检查下 sink 节点慢的原因(io 瓶颈、异常/ 节点 gc 之类的),前面的 map
> 节点应该是被反压导致停滞,可以通过 backpressure tab 确认下
>
> 田磊  于2021年6月9日周三 下午10:39写道:
>
> >
> >
> 提交任务后,通过flink的webui界面看,中间的map算子处理速度很快,13万条数据已经处理。但是sink端只处理了几千条数据,这个时候map端的处理也停滞了,不知道什么原因。map并行度8,sink并行度1。之前也是这样的并行度,并没有出现类似的情况。
> >
> >
> > | |
> > totorobabyfans
> > |
> > |
> > 邮箱:totorobabyf...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
>


JDBC异步lookup join有FLIP或者迭代计划吗?

2021-06-09 Thread Luna Wong
如果没有我用VertX和Druid连接池贡献下代码。这个要在dev邮件列表里讨论是吗


Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-09 Thread Joseph Lorenzini




Hi Arvid,
 
I may have figured out the problem. 
 
When using a tumbling window on a keyed stream and event time is being used, does time only move forward when there’s an event with a newer timestamp? Said another way: if I have a 5 second tumbling window, the job would need to consume
 at least two events before a computation would occur: the first event has a timestamp that fits within the 5 second window, the second event has timestamp that exceeds the max timestamp of the previous window.

 
Does that sound right?
 
Thanks,
Joe 
 

From: Arvid Heise 
Date: Wednesday, June 9, 2021 at 8:34 AM
To: Joseph Lorenzini 
Cc: "user@flink.apache.org" 
Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record


 



Hi Joe,


 


could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance.


On which Flink version are you running?


 


On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini  wrote:




Hi all,
 
I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this
 expected and if there’s a way to work around it. 
 
I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each
 kafka topic, I configured the flink kafka consumer like so:
 
   consumer.assignTimestampsAndWatermarks(
    WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withIdleness(Duration.ofSeconds(10))
    );
 
The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events
 are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.
 
Is this expected and if it how do you handle this use case?
 
Thanks,
Joe


Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone.
 In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message
 that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.





Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Re: DataStream API in Batch Execution mode

2021-06-09 Thread Marco Villalobos
That worked.  Thank you very much.

On Mon, Jun 7, 2021 at 9:23 PM Guowei Ma  wrote:

> Hi, Macro
>
> I think you could try the `FileSource` and you could find an example from
> [1]. The `FileSource` would scan the file under the given
> directory recursively.
> Would you mind opening an issue for lacking the document?
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
> Best,
> Guowei
>
>
> On Tue, Jun 8, 2021 at 5:59 AM Marco Villalobos 
> wrote:
>
>> How do I use a hierarchical directory structure as a file source in S3
>> when using the DataStream API in Batch Execution mode?
>>
>> I have been trying to find out if the API supports that,
>> because currently our data is organized by years, halves, quarters, months,
>> and but before I launch the job, I flatten the file structure just to
>> process the right set of files.
>>
>>
>>


Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-09 Thread Arvid Heise
Hi Joe,

Yes, that is correct! Only when a new record arrives and we know that
timestamp, we can deduce the watermark and advance it. The window operator
would close the old window and open a new one.

Sorry that I haven't seen that immediately. It's sometimes hard to think in
terms of individual records when you are used to think in millions.

On Wed, Jun 9, 2021 at 8:49 PM Joseph Lorenzini 
wrote:

> Hi Arvid,
>
>
>
> I may have figured out the problem.
>
>
>
> When using a tumbling window on a keyed stream and event time is being
> used, does time only move forward when there’s an event with a newer
> timestamp? Said another way: if I have a 5 second tumbling window, the job
> would need to consume at least two events before a computation would occur:
> the first event has a timestamp that fits within the 5 second window, the
> second event has timestamp that exceeds the max timestamp of the previous
> window.
>
>
>
> Does that sound right?
>
>
>
> Thanks,
>
> Joe
>
>
>
> *From: *Arvid Heise 
> *Date: *Wednesday, June 9, 2021 at 8:34 AM
> *To: *Joseph Lorenzini 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Records Are Never Emitted in a Tumbling Event Window When
> Each Key Only Has One Record
>
>
>
> Hi Joe,
>
>
>
> could you please check (in web UI) if the watermark is advancing past the
> join? The window operator would not trigger if it doesn't advance.
>
> On which Flink version are you running?
>
>
>
> On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini 
> wrote:
>
> Hi all,
>
>
>
> I have observed behavior joining two keyed streams together, where events
> are never emitted.  The source of each stream is a different kafka topic. I
> am curious to know if this expected and if there’s a way to work around it.
>
>
>
> I am using a tumbling event window. All records across the two kafka
> topics occurred within the same 5 second window of time. Each kafka topic
> has a single partition.  For each kafka topic, I configured the flink kafka
> consumer like so:
>
>
>
>consumer.assignTimestampsAndWatermarks(
>
> WatermarkStrategy
>
>
> .forBoundedOutOfOrderness(Duration.ofSeconds(10))
>
> .withIdleness(Duration.ofSeconds(10))
>
> );
>
>
>
> The tumbling window has a duration of 60 seconds. Now it happens to be the
> case that there is only a single event when joining on a key.  If I use
> Tumbling Process window then events are emitted as expected. If I actually
> ensure there are multiple events for a key then the events are also
> emitted. However, if it’s a single event per key in a tumbling event window
> then no events are emitted.
>
>
>
> Is this expected and if it how do you handle this use case?
>
>
>
> Thanks,
>
> Joe
>
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-09 Thread Joseph Lorenzini




Hi Arvid,
 
I am on 1.11.2. 
 
The flink job has four operators:
 

Source from kakfa topic one: sent 14 recordsSource from kafka topic two: sent 6 recordsMap: received 15 records/sent 14 recordsMap: received 6 records/sent 6 recordsTumbling Window to Filesink: received 20 records/sent 0 records
 
The watermark is the same for the map operators and the tumbling window, which is to say that between the map and tumbling window the watermark did not advance.

 
Any idea why that might be happening? I did notice that the timestamps for all kafka records are within a fraction of a second of one another. For example:

 

2021-06-09T08:57:00.993-05:002021-06-09T08:57:00.997-05:00
 
I also noted that some kafka records in topic A have the exact same timestamp as records in topic B.

 
Could timestamps not being far enough part (e.g millisecond or more) or two records between two soruces have the exact same time,  cause the watermarks to not advance?
 
 
Joe 
 

From: Arvid Heise 
Date: Wednesday, June 9, 2021 at 8:34 AM
To: Joseph Lorenzini 
Cc: "user@flink.apache.org" 
Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record


 



Hi Joe,


 


could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance.


On which Flink version are you running?


 


On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini  wrote:




Hi all,
 
I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this
 expected and if there’s a way to work around it. 
 
I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each
 kafka topic, I configured the flink kafka consumer like so:
 
   consumer.assignTimestampsAndWatermarks(
    WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withIdleness(Duration.ofSeconds(10))
    );
 
The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events
 are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.
 
Is this expected and if it how do you handle this use case?
 
Thanks,
Joe


Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone.
 In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message
 that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.





Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




flink sql维表延迟join如何实现?

2021-06-09 Thread casel.chen
延迟join主要是为了解决维表数据后于事实表数据到达问题。java代码可以实现,那flink sql这块能否通过sql hint解决呢?有没有示例?

sink端处理数据很慢

2021-06-09 Thread 田磊
提交任务后,通过flink的webui界面看,中间的map算子处理速度很快,13万条数据已经处理。但是sink端只处理了几千条数据,这个时候map端的处理也停滞了,不知道什么原因。map并行度8,sink并行度1。之前也是这样的并行度,并没有出现类似的情况。


| |
totorobabyfans
|
|
邮箱:totorobabyf...@163.com
|

签名由 网易邮箱大师 定制

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-09 Thread Piotr Nowojski
Yes good catch Kezhu, IllegalStateException sounds very much like
FLINK-21028.

Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't
been released yet)?

Piotrek

wt., 8 cze 2021 o 17:18 Kezhu Wang  napisał(a):

> Could it be same as FLINK-21028[1] (titled as “Streaming application
> didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?
>
>
> [1]: https://issues.apache.org/jira/browse/FLINK-21028
>
>
> Best,
> Kezhu Wang
>
> On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote:
>
> Hi Thomas,
>
> I tried but do not re-produce the exception yet. I have filed
> an issue for the exception first [1].
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-22928
>
>
> --Original Mail --
> *Sender:*Thomas Wang 
> *Send Date:*Tue Jun 8 07:45:52 2021
> *Recipients:*Yun Gao 
> *CC:*user 
> *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API
>
>> This is actually a very simple job that reads from Kafka and writes to S3
>> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API
>> and nothing custom.
>>
>> Thomas
>>
>> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao  wrote:
>>
>>> Hi Thoms,
>>>
>>> Very thanks for reporting the exceptions, and it seems to be not work as
>>> expected to me...
>>> Could you also show us the dag of the job ? And does some operators in
>>> the source task
>>> use multiple-threads to emit records?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> --Original Mail --
>>> *Sender:*Thomas Wang 
>>> *Send Date:*Sun Jun 6 04:02:20 2021
>>> *Recipients:*Yun Gao 
>>> *CC:*user 
>>> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API
>>>
 One thing I noticed is that if I set drain = true, the job could be
 stopped correctly. Maybe that's because I'm using a Parquet file sink which
 is a bulk-encoded format and only writes to disk during checkpoints?

 Thomas

 On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang  wrote:

> Hi Yun,
>
> Thanks for the tips. Yes, I do see some exceptions as copied below.
> I'm not quite sure what they mean though. Any hints?
>
> Thanks.
>
> Thomas
>
> ```
> 2021-06-05 10:02:51
> java.util.concurrent.ExecutionException:
> org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to
> next operator
> at java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture
> .java:1928)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:130)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:134)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:80)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .closeOperators(OperatorChain.java:302)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .afterInvoke(StreamTask.java:576)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:544)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to
> next operator
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
> at org.apache.flink.streaming.api.operators.CountingOutput
> .emitWatermark(CountingOutput.java:41)
> at org.apache.flink.streaming.runtime.operators.
> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
> TimestampsAndWatermarksOperator.java:165)
> at org.apache.flink.api.common.eventtime.
> BoundedOutOfOrdernessWatermarks.onPeriodicEmit(
> BoundedOutOfOrdernessWatermarks.java:69)
> at org.apache.flink.streaming.runtime.operators.
> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator
> .java:125)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .lambda$closeOperator$5(StreamOperatorWrapper.java:205)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
> .runThrowing(StreamTaskActionExecutor.java:92)
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .closeOperator(StreamOperatorWrapper.java:203)
> at 

NPE when restoring from savepoint in Flink 1.13.1 application

2021-06-09 Thread 陳昌倬
Hi,

We have NullPointerException when trying to restore from savepoint for
the same jar, or different jar, or different parallelism.  We have
workaround this issue by changing UIDs in almost all operators. We want
to know if there is anyway to avoid this problem so that it will not
cause service maintence problem, thanks.


The following is redacted stack trace we can provide for now:

2021-06-09 13:08:59,849 WARN  
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
Could not execute application:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to execute job ''.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
 [?:?]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
''.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at  ~[?:?]
at  ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) ~[?:?]
at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]
at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 12 more
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
at 
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
 ~[?:?]
... 1 more
Caused by: org.apache.flink.runtime.client.JobInitializationException: 
Could not start the JobMaster.
at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
 ~[?:?]
at 

PyFlink: Upload resource files to Flink cluster

2021-06-09 Thread Sumeet Malhotra
Hi,

I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON
schema files actually). The path of this file can be passed into the UDTF,
but essentially this path needs to exist on the Task Manager node where the
task executes. What's the best way to upload these resource files? As of
now, my custom Flink image creates a fixed path with the required resource
files, but I'd like it to be run time configurable.

There are 2 APIs available to load files when submitting a PyFlink job...

*stream_execution_environment.add_python_file()* - Recommended to upload
files (.py etc) but doesn't let me configure the final path on the target
node. The files are added to PYTHONPATH, but it needs the UDTF function to
lookup for this file. I'd like to pass the file location into the UDTF
instead.

*stream_execution_environment.add_python_archive()* - Appears to be more
generic, in the sense that it allows a target directory to be specified.
The documentation doesn't say anything about the contents of the archive,
so I'm guessing it could be any type of file. Is this what is needed for my
use case?

Or is there any other recommended way to upload non-Python
dependencies/resources?

Thanks in advance,
Sumeet


回复:flink sql写mysql中文乱码问题

2021-06-09 Thread Jason Lee


同遇到这个问题,看TM的日志的执行Vertic可以看到Cast(_UTF-16LE),然后我们是想往Kafka写入数据,结果写入乱码。


然后想过通过(1)在集群机器的flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”
(2) 通过DECODE()ENCODE()将字符串编码解码成UTF-8或者GBK,都行不通


上述两种方式对乱码数据处理吗但是都是还会出现中文乱码。不知道你尝试过什么方法?有没有成功解决的?


| |
李闯
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


在2021年05月25日 23:31,casel.chen 写道:



我看flink官网介绍Table API & SQL的时候有打印sql执行计划中使用的是UTF-16LE字符集,为什么不用UTF-8呢?乱码会不会跟这个有关?
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/#%e8%a7%a3%e9%87%8a%e8%a1%a8



上述例子的结果是:

```text == Abstract Syntax Tree == LogicalUnion(all=[true]) 
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) 
FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) 
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, 
word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE’F%')]) 
DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], 
fields=[count, word])

== Physical Execution Plan == Stage 1 : Data Source content : collect elements 
with CollectionInputFormat

















在 2021-05-25 10:40:46,"casel.chen"  写道:
数据库字符编码设置如下


character_set_client,utf8mb4
character_set_connection,utf8mb4
character_set_database,utf8mb4
character_set_filesystem,binary
character_set_results,utf8mb4
character_set_server,utf8
character_set_system,utf8
character_sets_dir,/u01/mysql57_20200229/share/charsets/


客户端连接串是
jdbc:mysql://host:3306/datav_test?useUnicode=true=utf8


本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢!

















在 2021-05-19 17:52:01,"Michael Ran"  写道:



数据库的字段字符编码














在 2021-05-18 18:19:31,"casel.chen"  写道:
我的URL连接串已经使用了  useUnicode=truecharacterEncoding=UTF-8 结果还是会有乱码

















在 2021-05-18 17:21:12,"王炳焱" <15307491...@163.com> 写道:
你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8,像这样CREATE
 TABLE jdbc_sink(id INT  COMMENT '订单id',goods_name 
VARCHAR(128) COMMENT '商品名称',price DECIMAL(32,2) COMMENT '商品价格',   
 user_name VARCHAR(64) COMMENT '用户名称') WITH (   'connector' = 
'jdbc',   'url' = 
'jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8',
   'username' = 'mysqluser',   'password' = 'mysqluser',   
'table-name' = 'jdbc_sink')
在 2021-05-18 11:55:46,"casel.chen"  写道:
我的flink sql作业如下


SELECT
product_name,
window_start,
window_end,
CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
CAST(COUNT(order_no)ASBIGINT) trans_cnt,
-- LOCALTIMESTAMP AS insert_time,
'微支付事业部'AS bus_name
FROM(


mysql sink表的定义如下
CREATE TABLE XXX (
) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;


运行起来后写入mysql表的数据带有中文乱码 ??



查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Received task GroupAggregate(groupBy=[product_name, window_start, 
window_end], select=[product_name, window_start, window_end, 
SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> 
Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) 
DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) 
DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS 
trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, 
_UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
bus_name]) -> Sink: 
Sink(table=[default_catalog.default_database.all_trans_5m_new], 
fields=[product_name, window_start, window_end, trans_amt, trans_cnt, 
insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into 
slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, 
mer_cust_id, order_no, trans_date], select=[product_name, window_start, 
window_end, id, data_type, mer_cust_id, order_no, trans_date, 
MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, 
window_start, window_end, trans_amt, order_no]) (1/1)#0 
(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.


ProcessFunctionTestHarnesses for testing Python functions

2021-06-09 Thread Bogdan Sulima
Hi all,

in Java/Scala i was using ProcessFunctionTestHarnesses to test my
ProcessFunctions with timers based on event timestamps.
Now i am switching to Python (my favourite language). Is there a similar
TestHarness to support testing Python ProcessFunctions?

Thanks for your answers in advance.

Regards
Bogdan.


Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-09 Thread Arvid Heise
Hi Joe,

could you please check (in web UI) if the watermark is advancing past the
join? The window operator would not trigger if it doesn't advance.

On which Flink version are you running?

On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini 
wrote:

> Hi all,
>
>
>
> I have observed behavior joining two keyed streams together, where events
> are never emitted.  The source of each stream is a different kafka topic. I
> am curious to know if this expected and if there’s a way to work around it.
>
>
>
> I am using a tumbling event window. All records across the two kafka
> topics occurred within the same 5 second window of time. Each kafka topic
> has a single partition.  For each kafka topic, I configured the flink kafka
> consumer like so:
>
>
>
>consumer.assignTimestampsAndWatermarks(
>
> WatermarkStrategy
>
>
> .forBoundedOutOfOrderness(Duration.ofSeconds(10))
>
> .withIdleness(Duration.ofSeconds(10))
>
> );
>
>
>
> The tumbling window has a duration of 60 seconds. Now it happens to be the
> case that there is only a single event when joining on a key.  If I use
> Tumbling Process window then events are emitted as expected. If I actually
> ensure there are multiple events for a key then the events are also
> emitted. However, if it’s a single event per key in a tumbling event window
> then no events are emitted.
>
>
>
> Is this expected and if it how do you handle this use case?
>
>
>
> Thanks,
>
> Joe
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


Flink 日志和输出中文乱码

2021-06-09 Thread Jason Lee
大家好
我们在Flink数据写入Kafka时出现中文乱码问题,之后为了只管确定数据将Sink 
换成Print打印出来,结果发现还是乱码。然后查看TM的日志,发现日志里面的中文也是乱码,而且显示执行的是cast(_UTF-16LE0),这一点也很不解,如下:
"promotions":[{"amount":1000,"stage":1,"scope":0,"itemIndex":[1],"name":"å\u008D¡é¡¹æ\u009D\u0083ç\u009B\u008Aæ\u008Aµæ\u0089£","type":1,"value":1}],"deleted":0,"closeStateDesc":"æ\u009Cªå\u0085³é\u0097­","refundAbleDesc":"","createTime":1623206122000,"reserveOrderId":"","paid":true,"wipeOutAmount":0,"payChannel":13}':VARCHAR(2147483647)
 CHARACTERSET"UTF-16LE") AS dubbo_data2, CAST(_UTF-16LE'æ— 
需支付':VARCHAR(2147483647) CHARACTERSET"UTF-16LE") AS value_1], 
where=[(behavior = _UTF-16LE'CREATED':VARCHAR(2147483647) 
CHARACTERSET"UTF-16LE")]) -> SinkConversionToTuple2 -> Map -> Sink: Unnamed 
(1/1).




我们采用多种方式解决:
(1)在集群机器的flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”
(2) 通过DECODE()ENCODE()将字符串编码解码成UTF-8或者GBK,都行不通


上述方式都行不通,而且都是乱码,请问社区伙伴有遇到过类似问题,有好的解决方案吗?




| |
李闯
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制



Re: [table-walkthrough exception] Unable to create a source for reading table...

2021-06-09 Thread Arvid Heise
Hi Lingfeng,

could you try

org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}
${flink.version}



to your pom?

On Wed, Jun 9, 2021 at 5:04 AM Lingfeng Pu  wrote:

> Hi,
>
> I'm following the tutorial to run the "flink-playground/table-walkthrough"
> project on IDEA. However, I got *the exception as follows:*
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Unable to create a source for reading table
> 'default_catalog.default_database.transactions'.
>
> *The key localhost environment info shows below:*
> 1. OS: Fedora 34; 2. Flink version: 1.13.1;
> 3. Java version: 1.8; 4. Maven version: 3.6.3;
> 5. Docker version: 20.10.7 (API version: 1.41).
>
> *The entire error report shows below:*
> /usr/java/jdk1.8.0_291-amd64/bin/java
> -javaagent:/var/lib/snapd/snap/intellij-idea-community/302/lib/idea_rt.jar=46805:/var/lib/snapd/snap/intellij-idea-community/302/bin
> -Dfile.encoding=UTF-8 -classpath
> 

RE: Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi Zhang,

Please find the code snippet.

private ReducingState aggrRecord; // record being aggregated

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {
// FIXME timer is not working? or re-registration not working?
NatLogData event = aggrRecord.get();   // Always get null value.


Thanks,
Suchithra


From: JING ZHANG 
Sent: Wednesday, June 9, 2021 2:20 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: user@flink.apache.org; Jash, Shaswata (Nokia - IN/Bangalore) 

Subject: Re: Issue with onTimer method of KeyedProcessFunction

Hi Suchithra,
Would you please provide more information in detail or paste the main code?

Best regards,
JING ZHANG

V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 于2021年6月9日周三 下午3:42写道:
Hello,

We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3 
version. After upgrading to 1.12.3 version,  the onTimer method of  
KeyedProcessFunction is not behaving correctly, the value of ReducingState and 
ValueState always return null.

Could you please help in debugging the issue.

Thanks,
Suchithra



Re: Using s3 bucket for high availability

2021-06-09 Thread Kurtis Walker
Thank you, I figured it out.  My IAM policy was missing some actions.  Seems I 
needed to give it “*” for it to work.

From: Tamir Sagi 
Date: Wednesday, June 9, 2021 at 6:02 AM
To: Yang Wang , Kurtis Walker 

Cc: user 
Subject: Re: Using s3 bucket for high availability
EXTERNAL EMAIL

I'd try several things

try accessing the bucket from CLI first locally
 
https://docs.aws.amazon.com/cli/latest/reference/s3/

If it does not work
please check your credentials under ~/.aws/credentials file + ~/.aws/config = 
since the AWS clients read the credentials from these files by default(unless 
other credentials are set)

If everything works well:

  1.  Are you running inside EKS? if so, you must attach the pods a service 
account with corresponded permissions to S3.
  2.  If not,  make sure the pods have the credentials to AWS(access key, 
secret key, region)
Please provide more code snippet.

I recently ran Flink job on Application cluster in EKS. the job also reads 
files from S3. (Without HA)

Tamir

[https://my-email-signature.link/signature.gif?u=1088647=157997735=61757801dda4474f1dcdec8227f1c40523846082fd5a0e52f68e63cfd6b92721]

From: Yang Wang 
Sent: Wednesday, June 9, 2021 11:29 AM
To: Kurtis Walker 
Cc: user 
Subject: Re: Using s3 bucket for high availability


EXTERNAL EMAIL

It seems to be a S3 issue. And I am not sure it is the root cause. Could you 
please share more details of the JobManager log?

Or could you verify that the Flink cluster could access the S3 bucket 
successfully(e.g. store the checkpoint) when HA is disabled?

Best,
Yang

Kurtis Walker mailto:kurtis.wal...@sugarcrm.com>> 
于2021年6月8日周二 下午11:00写道:

Sorry, fat finger send before I finished writing….



Hello,

  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:



kubernetes.service-account: flink-service-account

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.storageDir: 
s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery



I’m getting an error accessing the bucket.



2021-06-08 14:33:42,189 DEBUG com.amazonaws.services.s3.AmazonS3Client  
   [] - Bucket region cache doesn't have an entry for 
corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket 
region from Amazon S3.

2021-06-08 14:33:42,193 DEBUG com.amazonaws.util.json.Jackson   
   [] - Failed to parse JSON string.

com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map 
due to end-of-input

at [Source: (String)""; line: 1, column: 0]

at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-s3-fs-presto-1.13.0.jar:1.13.0]



Is there an additional config I need for specifying the region for the bucket?  
I’ve been searching the doc and haven’t found anything like that.





From: Kurtis Walker 
mailto:kurtis.wal...@sugarcrm.com>>
Date: Tuesday, June 8, 2021 at 10:55 AM
To: user mailto:user@flink.apache.org>>
Subject: Using s3 bucket for high availability

Hello,

  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: Persisting state in RocksDB

2021-06-09 Thread Arvid Heise
Hi Paul,

Welcome to the club!

What's your SinkFunction? Is it custom? If so, you could also implement
CheckpointedFunction to read and write data.
Here you could use OperatorStateStore and with it the BroadcastState.
However, broadcast state is quite heavy (it sends all data to all
instances, so it doesn't scale).

A better way would be to have a keyBy+KeyedProcessFunction before the sink
function. You could keyBy your key and use a normal value state [1] to
store the data point. If you configure your state backend to be rocksdb
[2]. Then you have everything together.

Note that you could also have it next to sink function. There is no reason
to not have a dangling operator (no sink).

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-keyed-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/

On Wed, Jun 9, 2021 at 12:13 AM Paul K Moore  wrote:

> Hi all,
>
> First post here, so please be kind :)
>
> Firstly some context; I have the following high-level job topology:
>
> (1) FlinkPulsarSource -> (2) RichAsyncFunction -> (3) SinkFunction
>
> 1. The FlinkPulsarSource reads event notifications about article updates
> from a Pulsar topic
> 2. The RichAsyncFunction fetches the “full” article from the specified URL
> end-point, and transmutes it into a “legacy” article format
> 3. The SinkFunction writes the “legacy” article to a (legacy) web platform
> i.e. the sink is effectively another web site
>
> I have this all up and running (despite lots of shading fun).
>
> When the SinkFunction creates an article on the legacy platform it returns
> an 'HTTP 201 - Created’ with a Location header suitably populated.
>
> Now, I need to persist that Location header and, more explicitly, need to
> persist a map between the URLs for the new and legacy platforms.  This is
> needed for latter update and delete processing.
>
> The question is how do I store this mapping information?
>
> I’ve spent some time trying to grok state management and the various
> backends, but from what I can see the state management is focused on
> “operator scoped” state.  This seems reasonable given the requirement for
> barriers etc to ensure accurate recovery.
>
> However, I need some persistence between operators (shared state?) and
> with longevity beyond the processing of an operator.
>
> My gut reaction is that I need an external K-V store such as Ignite (or
> similar). Frankly given that Flink ships with embedded RocksDB I was hoping
> to use that, but there seems no obvious way to do this, and lots of advice
> saying don’t :)
>
> Am I missing something obvious here?
>
> Many thanks in advance
>
> Paul
>
>
>


Re: subscribe

2021-06-09 Thread Arvid Heise
To subscribe, please send a mail to user-subscr...@flink.apache.org

On Fri, Jun 4, 2021 at 4:56 AM Boyang Chen 
wrote:

>
>


Re: Flink kafka consumers stopped consuming messages

2021-06-09 Thread Ilya Karpov
Hi Arvid,
thanks for reply,

thread dump + logs research didn’t help. We suggested that problem was in async 
call to remote key-value storage because we (1) found that async client timeout 
was set to 0 (effectively no timeout, idle infinitely), (2) async client 
threads we sleeping, (3) AsyncWaitOperator.Emitter thread was blocked peeking 
new async result while AsyncWaitOperator.processWatermak was blocked to put new 
item in a queue. We changed timeout to non zero value and since then (for a 
week or so) job doesn’t hang. So, I guess the problem was in async client 
timeout (not in kafka or flink).

Hope this helps someone!

> 9 июня 2021 г., в 14:10, Arvid Heise  написал(а):
> 
> Hi Ilya,
> 
> These messages could pop up when a Kafka broker is down but should eventually 
> disappear. So I'm a bit lost. 
> 
> If there was a bug, it's also most likely fixed in the meantime. So if you 
> want to be on the safe side, I'd try to upgrade to more recent versions 
> (Flink + Kafka consumer).
> 
> Best,
> 
> Arvid
> 
> On Wed, Jun 2, 2021 at 7:01 PM Ilya Karpov  > wrote:
> Hi there,
> 
> today I've observed strange behaviour of a flink streaming application (flink 
> 1.6.1, per-job cluster deployment, yarn):
> 3 task managers (2 slots each) are running but only 1 slot is actually 
> consuming messages from kafka (v0.11.0.2), others were idling 
> (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics). 
> 
> So I started to investigate: 
> - `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for all 
> 6 topic partitions are constantly increasing.
> - `kafka-consumer-groups.sh` listed only single (the 4th) partition. That 
> makes me thinks that by somehow 5 kafka consumers lost connection to brokers.
> - A LOT of messages "Committing offsets to Kafka takes longer than the 
> checkpoint interval. Skipping commit of previous offsets because newer 
> complete checkpoint offsets are available. This does not compromise Flink's 
> checkpoint integrity." in each task manager instance.
> - 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
> - no application errors/uncaught exceptions etc.
> - no reconnections to kafka.
> - some network issues connected with hdfs (Slow waitForAckedSeqno).
> - all kafka networking setting are default (e.g. timeouts).
> 
> After job restart all task managers started to consume messages (6 slots in 
> total, and `kafka-consumer-groups.sh` listed that all 6 partitions are 
> consumed).
> 
> May be someone had already experienced something similar?
> 
> Job topology is as follows (no window operations!):
> ```
> val dataStream = env.addSource(kafkaSource).map(processor);
> 
> val terminalStream = AsyncDataStream
> .unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
> .process(sideOutputFun);
> 
> terminalStream
> .keyBy(selector)
> .process(keyProcFun)
> .addSink(kafkaSink_1);
> 
> terminalStream
> .getSideOutput("outputTag")
> .addSink(kafkaSink_2);
> ```



RE: Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi Zhang,

Please find the code snippet.

private ReducingState aggrRecord; // record being aggregated

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {
// FIXME timer is not working? or re-registration not working?
NatLogData event = aggrRecord.get();   // Always get null value.


Thanks,
Suchithra

From: JING ZHANG 
Sent: Wednesday, June 9, 2021 2:20 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: user@flink.apache.org; Jash, Shaswata (Nokia - IN/Bangalore) 

Subject: Re: Issue with onTimer method of KeyedProcessFunction

Hi Suchithra,
Would you please provide more information in detail or paste the main code?

Best regards,
JING ZHANG

V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 于2021年6月9日周三 下午3:42写道:
Hello,

We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3 
version. After upgrading to 1.12.3 version,  the onTimer method of  
KeyedProcessFunction is not behaving correctly, the value of ReducingState and 
ValueState always return null.

Could you please help in debugging the issue.

Thanks,
Suchithra



Re: behavior differences when sinking side outputs to a custom KeyedCoProcessFunction

2021-06-09 Thread Arvid Heise
Hi Jin,

as you have figured out, if something goes wrong with watermarks it's
usually because of the watermark generator (sorry for not receiving any
feedback whatsoever).

Thank you very much for sharing your solution!

On Thu, Jun 3, 2021 at 8:51 PM Jin Yi  wrote:

> just to resolve this thread, i figured out the issue.  there's a local
> version of a watermark strategy that we use when running locally for
> development that didn't work correctly on many events with the same
> timestamp which the fake data generation that happens for local runs has a
> tendency to do.  fixing the local watermark generator used in the strategy
> to account for this properly fixed all of my issues.
>
> On Fri, May 21, 2021 at 10:09 AM Jin Yi  wrote:
>
>> (sorry that the last sentence fragment made it into my email... it was a
>> draft comment that i forgot to remove.  my thoughts are/were complete in
>> the first message.)
>>
>> i do have follow-up questions/thoughts for this thread though.  given my
>> current setup, it seems it's more expected to have the behavior when i
>> touch the right events given how event based watermarks and kafka connector
>> generated watermarks should work.  a 2 input stream op should fire its
>> timers on the min of the left and right watermark inputs based on what i've
>> read.  so, it seems that my custom keyedcoprocessfunction's onTimer should
>> only fire when a slowest watermark of either input stream reaches the
>> timer's time, correct?  the behavior of things being dropped from the right
>> even stream prematurely (what i originally thought was logically
>> conclusive) based on just the right watermark point would be the incorrect
>> behavior?
>>
>> should i file an issue/bug?
>>
>> On Thu, May 20, 2021 at 3:39 PM Jin Yi  wrote:
>>
>>> hello,
>>>
>>> sorry for a long post, but this is a puzzling problem and i am enough of
>>> a flink non-expert to be unsure what details are important or not.
>>>
>>> background:
>>> i have a flink pipeline that is a series of custom "joins" for the
>>> purposes of user event "flattening" that i wrote a custom
>>> KeyedCoProcessFunction that either joins on a parent id between the two
>>> connected streams using the "left" event's primary key and the foreign key
>>> on the right event OR if the right (child) event doesn't have a foreign
>>> key, tries to infer the join using heuristics to limit the possible parent
>>> events and grabbing the temporally-closest one.  both the inference and
>>> state cleanup for these joins are happening on the onTimer method.
>>>
>>> everything is based on event time, and i'm using kafka connector input
>>> source for the right event inputs to these operators.  here's what the
>>> pipeline looks like, with the joins in question acting like a chain of
>>> joins with the output of the previous join (left input) being joined with a
>>> new raw event source (right input):
>>>
>>> [image: Screen Shot 2021-05-20 at 3.12.22 PM.png]
>>> these join functions have a time window/duration or interval associated
>>> with them to define the duration of join state and inference window.  this
>>> is set per operator to allow for in order and out of order join thresholds
>>> for id based joins, and this window acts as the scope for inference when a
>>> right event that is an inference candidate (missing foreign key id) is
>>> about to be evicted from state.
>>>
>>> problem:
>>>
>>> i have things coded up with side outputs for duplicate, late and dropped
>>> events.  the dropped events case is the one i am focusing on since events
>>> that go unmatched are dropped when they are evicted from state.  only rhs
>>> events are the ones being dropped, with rhs events w/ foreign keys dropped
>>> when they go unmatched (late/no left arrival or no valid inference based
>>> left event).  with a wide enough time duration setting for both in order
>>> and out of order durations, everything gets matched.  however, when testing
>>> things out, i observed (expectedly) that the dropped events increases the
>>> tighter you make the join window based on these durations.  great, this
>>> makes sense.  i wanted to get a better understanding for these durations'
>>> impacts, so i wrote our integration/stress test case to focus on just id
>>> key based joins to start on this.
>>>
>>> further, to help observe the dropping characteristics, i connected the
>>> side outputs to some s3 sinks to store these dropped events.  originally,
>>> these dropped right events were output properly to the s3 output.  for the
>>> integration/stress test setup, they start to appear with durations < 1
>>> minute.
>>>
>>> however, i observed that they didn't include the flink Context.timestamp
>>> encoded in the event structure anywhere (the left events were already
>>> setting the timestamp in the processElement1 method).  i wanted this
>>> information to see how event time processing worked in practice.  so, i
>>> made a similarly simple change to the processElement2 function 

Re: streaming file sink OUT metrics not populating

2021-06-09 Thread Arvid Heise
For reference, the respective FLIP shows the ideas [1]. It's on our agenda
for 1.14.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics

On Thu, Jun 3, 2021 at 6:41 PM Chesnay Schepler  wrote:

> This is a known issue, and cannot be fixed on the user side.
> The underlying problem is that this needs to be implemented separately for
> each source/sink and we haven't gotten around to doing that yet, but some
> progress is being made for 1.14 to that end.
>
> On 6/3/2021 6:06 PM, Vijayendra Yadav wrote:
>
> Hi Team,
>
> I am using streaming file sink and sinking data to s3. Also, using
> Graphite exporter for metrics.
>
> I can see correct counters for Source, Map, Filter functions. But for
> SINK, only* numRecordsIn* is populating, I am looking to get *numRecordsOut
> *counts also, but it's always staying 0 although data is sinking to s3
> correctly.
>
> Any reason for OUT ( *numRecordsOut) *metrics to stay zero all the time?
> Any way to fix it ?
>
> Thanks,
> Vijay
>
>
>


Re: Events triggering JobListener notification

2021-06-09 Thread Arvid Heise
Hi Barak,

I think the answer to your question is lies in the javadoc:

/**
 * Callback on job execution finished, successfully or unsuccessfully.
It is only called back
 * when you call {@code execute()} instead of {@code executeAsync()}
methods of execution
 * environments.
 *
 * Exactly one of the passed parameters is null, respectively for
failure or success.
 */
void onJobExecuted(
@Nullable JobExecutionResult jobExecutionResult, @Nullable
Throwable throwable);


So this callback will be invoked even on failure and cancellation.

On Thu, Jun 3, 2021 at 2:38 PM Barak Ben Nathan 
wrote:

> Hi all,
>
>
>
> I am using Flink 1.12.1
>
>
>
> I’m building a system that creates/cancels Flink Jobs and monitors them.
>
>
>
> We thought to use org.apache.flink.core.execution.JobListener as a ‘push’
> mechanism for job-status-change events.
>
>
>
> We based this idea on the documentation that stated that JobListener ‘…is
> notified on specific job status changed’
>
>
>
> However,  from the JobListener’s methods ‘onJobSubmitted’,
> ‘onJobExecuted’, and their documentation, I understand that JobListener is
> not notified on **all** events.
>
> E.g. :
>
> Job failure (after it running for some time) or Job cancellation, will not
> cause JobListener to be notified.
>
>
>
> Am I correct?
>
>
>
> Barak
>
>
>


Re: Flink kafka consumers stopped consuming messages

2021-06-09 Thread Arvid Heise
Hi Ilya,

These messages could pop up when a Kafka broker is down but should
eventually disappear. So I'm a bit lost.

If there was a bug, it's also most likely fixed in the meantime. So if you
want to be on the safe side, I'd try to upgrade to more recent versions
(Flink + Kafka consumer).

Best,

Arvid

On Wed, Jun 2, 2021 at 7:01 PM Ilya Karpov  wrote:

> Hi there,
>
> today I've observed strange behaviour of a flink streaming application
> (flink 1.6.1, per-job cluster deployment, yarn):
> 3 task managers (2 slots each) are running but only 1 slot is actually
> consuming messages from kafka (v0.11.0.2), others were idling
> (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics).
>
> So I started to investigate:
> - `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for
> all 6 topic partitions are constantly increasing.
> - `kafka-consumer-groups.sh` listed only single (the 4th) partition. That
> makes me thinks that by somehow 5 kafka consumers lost connection to
> brokers.
> - A LOT of messages "Committing offsets to Kafka takes longer than the
> checkpoint interval. Skipping commit of previous offsets because newer
> complete checkpoint offsets are available. This does not compromise Flink's
> checkpoint integrity." in each task manager instance.
> - 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
> - no application errors/uncaught exceptions etc.
> - no reconnections to kafka.
> - some network issues connected with hdfs (Slow waitForAckedSeqno).
> - all kafka networking setting are default (e.g. timeouts).
>
> After job restart all task managers started to consume messages (6 slots
> in total, and `kafka-consumer-groups.sh` listed that all 6 partitions are
> consumed).
>
> May be someone had already experienced something similar?
>
> Job topology is as follows (no window operations!):
> ```
> val dataStream = env.addSource(kafkaSource).map(processor);
>
> val terminalStream = AsyncDataStream
> .unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
> .process(sideOutputFun);
>
> terminalStream
> .keyBy(selector)
> .process(keyProcFun)
> .addSink(kafkaSink_1);
>
> terminalStream
> .getSideOutput("outputTag")
> .addSink(kafkaSink_2);
> ```


回复:sql client提交 flink任务失败

2021-06-09 Thread Fei Han
我现在设置了如下环境变量:

export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export PATH=${HADOOP_HOME}/bin:$PATH
export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf
export HBASE_CONF_DIR=/etc/hbase/conf
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`


作业提交yarn web ui是这样的:


进入Flink  web  ui 报错:


不知道是什么原因,请大家帮忙看下


--
发件人:Shengkai Fang 
发送时间:2021年6月9日(星期三) 09:54
收件人:user-zh ; Fei Han 
主 题:Re: sql client提交 flink任务失败

可以看看之前的问题,看看能否解决。

Best,
Shengkai

[1] http://apache-flink.147419.n8.nabble.com/Flink-td7866.html
[2] https://issues.apache.org/jira/browse/FLINK-20780
Fei Han  于2021年6月8日周二 下午8:03写道:

 @all:
 Flink环境:Flink1.13.1
 HADOOP环境:CDH5.15.2
 测试命令如下:./bin/sql-client.sh embedded -i  /root/init_iceberg.sql -f 
/root/hive_catalog.sql
 问题描述:在提交命令后,yarn上面提交成功,Flink1.13.·1 web ui 今天测试出现问题如下:
 2021-06-08 12:02:45
 org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
 at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
 at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
 at akka.actor.Actor.aroundReceive(Actor.scala:517)
 at akka.actor.Actor.aroundReceive$(Actor.scala:515)
 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
 at akka.actor.ActorCell.invoke(ActorCell.scala:561)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
 at akka.dispatch.Mailbox.run(Mailbox.scala:225)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
 at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:4045)
 at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:4013)
 at org.apache.iceberg.hive.HiveClientPool.(HiveClientPool.java:45)
 at 
org.apache.iceberg.hive.CachedClientPool.lambda$clientPool$0(CachedClientPool.java:58)
 at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2337)
 at 
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
 at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2335)
 at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2318)
 at 

Re: Jupyter PyFlink Web UI

2021-06-09 Thread Jeff Zhang
BTW, you can also send email to zeppelin user maillist to join zeppelin
slack channel to discuss more details.
http://zeppelin.apache.org/community.html


Jeff Zhang  于2021年6月9日周三 下午6:34写道:

> Hi Maciek,
>
> You can try zeppelin which support pyflink and display flink job url
> inline.
>
> http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html
>
>
> Maciej Bryński  于2021年6月9日周三 下午1:53写道:
>
>> Nope.
>> I found the following solution.
>>
>> conf = Configuration()
>> env =
>> StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
>> env_settings =
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> table_env =
>> StreamTableEnvironment.create(stream_execution_environment=env,
>> environment_settings=env_settings)
>>
>> I also created the bug report
>> https://issues.apache.org/jira/browse/FLINK-22924.
>> I think this API should be exposed in Python.
>>
>> śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
>> >
>> > Hi Macike,
>> >
>> > You could try if the following works:
>> >
>> > ```
>> > table_env.get_config().get_configuration().set_string("rest.bind-port",
>> "xxx")
>> > ```
>> >
>> > Regards,
>> > Dian
>> >
>> > > 2021年6月8日 下午8:26,maverick  写道:
>> > >
>> > > Hi,
>> > > I've got a question. I'm running PyFlink code from Jupyter Notebook
>> starting
>> > > TableEnvironment with following code:
>> > >
>> > > env_settings =
>> > >
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> > > table_env = TableEnvironment.create(env_settings)
>> > >
>> > > How can I enable Web UI in this code?
>> > >
>> > > Regards,
>> > > Maciek
>> > >
>> > >
>> > >
>> > > --
>> > > Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> >
>>
>>
>> --
>> Maciek Bryński
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Best Regards

Jeff Zhang


Re: Jupyter PyFlink Web UI

2021-06-09 Thread Jeff Zhang
Hi Maciek,

You can try zeppelin which support pyflink and display flink job url inline.

http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html


Maciej Bryński  于2021年6月9日周三 下午1:53写道:

> Nope.
> I found the following solution.
>
> conf = Configuration()
> env =
> StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(stream_execution_environment=env,
> environment_settings=env_settings)
>
> I also created the bug report
> https://issues.apache.org/jira/browse/FLINK-22924.
> I think this API should be exposed in Python.
>
> śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
> >
> > Hi Macike,
> >
> > You could try if the following works:
> >
> > ```
> > table_env.get_config().get_configuration().set_string("rest.bind-port",
> "xxx")
> > ```
> >
> > Regards,
> > Dian
> >
> > > 2021年6月8日 下午8:26,maverick  写道:
> > >
> > > Hi,
> > > I've got a question. I'm running PyFlink code from Jupyter Notebook
> starting
> > > TableEnvironment with following code:
> > >
> > > env_settings =
> > >
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> > > table_env = TableEnvironment.create(env_settings)
> > >
> > > How can I enable Web UI in this code?
> > >
> > > Regards,
> > > Maciek
> > >
> > >
> > >
> > > --
> > > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>
> --
> Maciek Bryński
>


-- 
Best Regards

Jeff Zhang


回复:退订

2021-06-09 Thread liuhu1993
回复:退订
--
发件人:5599 <673313...@qq.com>
发送时间:2021年6月9日(星期三) 14:44
收件人:user-zh 
主 题:回复:退订

user-zh-unsubscr...@flink.apache.org




--原始邮件--
发件人: "金晓龙"

Re: Using s3 bucket for high availability

2021-06-09 Thread Tamir Sagi

I'd try several things

try accessing the bucket from CLI first locally
 https://docs.aws.amazon.com/cli/latest/reference/s3/

If it does not work
please check your credentials under ~/.aws/credentials file + ~/.aws/config = 
since the AWS clients read the credentials from these files by default(unless 
other credentials are set)

If everything works well:

  1.  Are you running inside EKS? if so, you must attach the pods a service 
account with corresponded permissions to S3.
  2.  If not,  make sure the pods have the credentials to AWS(access key, 
secret key, region)

Please provide more code snippet.

I recently ran Flink job on Application cluster in EKS. the job also reads 
files from S3. (Without HA)

Tamir

[https://my-email-signature.link/signature.gif?u=1088647=157997735=61757801dda4474f1dcdec8227f1c40523846082fd5a0e52f68e63cfd6b92721]

From: Yang Wang 
Sent: Wednesday, June 9, 2021 11:29 AM
To: Kurtis Walker 
Cc: user 
Subject: Re: Using s3 bucket for high availability


EXTERNAL EMAIL


It seems to be a S3 issue. And I am not sure it is the root cause. Could you 
please share more details of the JobManager log?

Or could you verify that the Flink cluster could access the S3 bucket 
successfully(e.g. store the checkpoint) when HA is disabled?

Best,
Yang

Kurtis Walker mailto:kurtis.wal...@sugarcrm.com>> 
于2021年6月8日周二 下午11:00写道:

Sorry, fat finger send before I finished writing….



Hello,

  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:



kubernetes.service-account: flink-service-account

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.storageDir: 
s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery



I’m getting an error accessing the bucket.



2021-06-08 14:33:42,189 DEBUG com.amazonaws.services.s3.AmazonS3Client  
   [] - Bucket region cache doesn't have an entry for 
corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket 
region from Amazon S3.

2021-06-08 14:33:42,193 DEBUG com.amazonaws.util.json.Jackson   
   [] - Failed to parse JSON string.

com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map 
due to end-of-input

at [Source: (String)""; line: 1, column: 0]

at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-s3-fs-presto-1.13.0.jar:1.13.0]



Is there an additional config I need for specifying the region for the bucket?  
I’ve been searching the doc and haven’t found anything like that.





From: Kurtis Walker 
mailto:kurtis.wal...@sugarcrm.com>>
Date: Tuesday, June 8, 2021 at 10:55 AM
To: user mailto:user@flink.apache.org>>
Subject: Using s3 bucket for high availability

Hello,

  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


报错分析,readAddress, connection time out。

2021-06-09 Thread yidan zhao
报错日志片段

2021-06-09 17:42:53,873 ERROR
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue [] -
Encountered error while consuming partitions

org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
readAddress(..) failed: Connection timed out

,如上是某个taskmanager的报错,30个taskmanager不好都去看。想知道这个异常指什么超时?需要做什么调整能改善呢。


Re:FlinkSQL join 维表后一定会变成 upsert流吗?

2021-06-09 Thread 东东
你用的是regular join吧,那就必然会变成retract流啊,因为一旦右流有变化,就会将变更影响到的结果输出,不就会有retract么。



join的类型可以看一下文档


在 2021-06-09 16:50:55,"WeiXubin" <18925434...@163.com> 写道:
>请教各位一下,我使用 FlinkSQL 编写任务时,kafka source ->  MySQL sink  不设置主键,查看了一下 request
>mode 是 [INSERT] ,也就是普通的 append 流,这很正常。
>
>但是当我关联上维表后,发现 request mode 变成了 [INSERT, UPDATE_BEFORE, UPDATE_AFTER,
>DELETE],这时异常报错会要求我给 sink 表设置主键,当我设置上主键后就会变成了 upsert 流。
>
>upsert流底层实现原理是 INSERT INTO ... DUPLICATE KEY UPDATE,由于我采用的是阿里云的ADB数据库,该语法在
>ADB 中主键是不支持update的,这会导致报错。且业务上我只想以 append 流的形式插入表。 
>
>请问各位有什么好的解决方案吗,关联上维表但是还是保持 append流?
>
>// 维表
>CREATE TABLE DimTable (
>   //省略字段
>) WITH (
>'connector' = 'jdbc',
>'url' = '***',
>'table-name' = 'v2_dim_game_id',
>'driver' = 'com.mysql.cj.jdbc.Driver',
>'username' = '***',
>'password' = '***',
>'lookup.cache.max-rows'='5000',
>'lookup.cache.ttl' = '60s',
>'lookup.max-retries'='3'
>);
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: State migration for sql job

2021-06-09 Thread Yuval Itzchakov
As my company is also a heavy user of Flink SQL, the state migration story
is very important to us.

I as well believe that adding new fields should start to accumulate state
from the point in time of the change forward.

Is anyone actively working on this? Is there anyway to get involved?

On Tue, Jun 8, 2021, 17:33 aitozi  wrote:

> Thanks for JING & Kurt's reply. I think we prefer to choose the option (a)
> that will not take  the history data into account.
>
> IMO, if we want to process all the historical data, we have to store the
> original data, which may be a big overhead to backend. But if we just
> aggregate after the new added function, may just need a data format
> transfer. Besides, the business logic we met only need the new aggFunction
> accumulate with new data.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


FlinkSQL join 维表后一定会变成 upsert流吗?

2021-06-09 Thread WeiXubin
请教各位一下,我使用 FlinkSQL 编写任务时,kafka source ->  MySQL sink  不设置主键,查看了一下 request
mode 是 [INSERT] ,也就是普通的 append 流,这很正常。

但是当我关联上维表后,发现 request mode 变成了 [INSERT, UPDATE_BEFORE, UPDATE_AFTER,
DELETE],这时异常报错会要求我给 sink 表设置主键,当我设置上主键后就会变成了 upsert 流。

upsert流底层实现原理是 INSERT INTO ... DUPLICATE KEY UPDATE,由于我采用的是阿里云的ADB数据库,该语法在
ADB 中主键是不支持update的,这会导致报错。且业务上我只想以 append 流的形式插入表。 

请问各位有什么好的解决方案吗,关联上维表但是还是保持 append流?

// 维表
CREATE TABLE DimTable (
   //省略字段
) WITH (
'connector' = 'jdbc',
'url' = '***',
'table-name' = 'v2_dim_game_id',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = '***',
'password' = '***',
'lookup.cache.max-rows'='5000',
'lookup.cache.ttl' = '60s',
'lookup.max-retries'='3'
);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread JING ZHANG
Hi Suchithra,
Would you please provide more information in detail or paste the main code?

Best regards,
JING ZHANG

V N, Suchithra (Nokia - IN/Bangalore) 
于2021年6月9日周三 下午3:42写道:

> Hello,
>
>
>
> We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3
> version. After upgrading to 1.12.3 version,  the onTimer method of
>  KeyedProcessFunction is not behaving correctly, the value of ReducingState
> and ValueState always return null.
>
>
>
> Could you please help in debugging the issue.
>
>
>
> Thanks,
>
> Suchithra
>
>
>


Re: Using s3 bucket for high availability

2021-06-09 Thread Yang Wang
It seems to be a S3 issue. And I am not sure it is the root cause. Could
you please share more details of the JobManager log?

Or could you verify that the Flink cluster could access the S3 bucket
successfully(e.g. store the checkpoint) when HA is disabled?

Best,
Yang

Kurtis Walker  于2021年6月8日周二 下午11:00写道:

> Sorry, fat finger send before I finished writing….
>
>
>
> Hello,
>
>   I’m trying to set up my flink native Kubernetes cluster with High
> availability.  Here’s the relevant config:
>
>
>
> kubernetes.service-account: flink-service-account
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> high-availability.storageDir:
> s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery
>
>
>
> I’m getting an error accessing the bucket.
>
>
>
> 2021-06-08 14:33:42,189 DEBUG
> com.amazonaws.services.s3.AmazonS3Client [] - Bucket
> region cache doesn't have an entry for
> corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket
> region from Amazon S3.
>
> 2021-06-08 14:33:42,193 DEBUG
> com.amazonaws.util.json.Jackson  [] - Failed to
> parse JSON string.
>
> com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to
> map due to end-of-input
>
> at [Source: (String)""; line: 1, column: 0]
>
> at
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-s3-fs-presto-1.13.0.jar:1.13.0]
>
>
>
> Is there an additional config I need for specifying the region for the
> bucket?  I’ve been searching the doc and haven’t found anything like that.
>
>
>
>
>
> *From: *Kurtis Walker 
> *Date: *Tuesday, June 8, 2021 at 10:55 AM
> *To: *user 
> *Subject: *Using s3 bucket for high availability
>
> Hello,
>
>   I’m trying to set up my flink native Kubernetes cluster with High
> availability.  Here’s the relevant config:
>


Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hello,

We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3 
version. After upgrading to 1.12.3 version,  the onTimer method of  
KeyedProcessFunction is not behaving correctly, the value of ReducingState and 
ValueState always return null.

Could you please help in debugging the issue.

Thanks,
Suchithra



Re: Questions about implementing a flink source

2021-06-09 Thread Arvid Heise
Hi Evan,

1. I'd recommend supporting DeserializationSchema in any case similar to
KafkaRecordDeserializationSchema.
First, it aligns with other sources and user expectations.
Second, it's a tad faster and the plan looks easier if you omit a chained
task.
Third, you can avoid quite a bit of boilerplate code on user side by having
adapters such that a user can use any existing Flink DeserializationSchema
to deserialize the payload; so without writing any UDF in 80% of the use
cases, the user gets the value that he wants (see
KafkaValueOnlyDeserializationSchemaWrapper).
Lastly, we also plan to have first class support for invalid record
handling at some point and it might be connected to DeserializationSchema.

2. It's any reassignment while there is still data flowing in the execution
graph. It's always a matter if there are parallel roads from source to
sink. As long as there is an old record on the road, sending new records on
a different road has always the potential of new record overtaking old
record.
If you could drain all data (currently not possible) without restarting,
then dynamic reassignment would be safe.

Note that without backpressure, it would certainly be enough to wait a
couple of seconds after unassigning a partition before reassigning it to
avoid any reordering issue. Maybe you could offer a configuration option
and the user has to take some responsibility.

I could also see that we could piggyback on aligned checkpoint barriers to
not emit any data until the checkpoint has been completed and do the
reassignment then. But that's certainly something that the framework should
support and that you don't want to implement on your own.

3. Yes if you throw an IOException (or any other exception), the checkpoint
would not complete and the task gets restarted (could be in an inconsistent
state).

On Tue, Jun 8, 2021 at 10:51 PM Evan Palmer  wrote:

> Hello again,
>
> Thank you for all of your help so far, I have a few more questions if you
> have the time :)
>
> 1. Deserialization Schema
>
> There's been some debate within my team about whether we should offer a
> DeserializationSchema and SerializationSchema in our source and sink.
>
> If we include don't include the schemas, our source and sink would be
> implement Source<...pubsublite.Message> and Sink<...pubsublite.Message>,
> which is the type our client library currently returns (this type is
> serializable), and users could transform the messages in a map function
> after the source. This would make implementing the source somewhat easier,
> and it doesn't seem like it would be much more difficult for users. On the
> other hand, I looked around and didn't find any flink sources implemented
> without a deserialization/serialization schema, so I'm worried that this
> choice might make our source/sink confusing for users, or that we're
> missing something. What are your thoughts on this?
>
> 2. Order aware rebalancing.
>
> I want to make sure I understand the problem with rebalancing partitions
> to different SourceReaders. Does any reassignment of a pub/sub
> partition between SourceReaders have the potential to cause disorder, or
> can order be guaranteed by some variant of ensuring that the partition is
> assigned to only one source reader at a time?
>
> I read through
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/overview/#parallel-dataflows,
> which made me think that if the user wanted a pipeline like
>
> env.fromSource(source).keyBy("Message Partition", ...).sinkTo(sink)
>
> Then if two different source tasks had messages from a single pub/sub
> partition, there could be disorder. We're not planning to implement any
> rebalancing of partitions in our source, but I wanted to make sure I can
> document this correctly :)
>
> 3. Reporting permanent failures in the Sink
>
> Is it sufficient to throw an exception from Committer.commit() in the case
> where our sink has permanently failed in some way (e.g. the configured
> topic has been deleted, or the user doesn't have permissions to publish),
> or is there something else we should be doing?
>
> Evan
>
>
> On Mon, May 10, 2021 at 9:57 AM Arvid Heise  wrote:
>
>> Hi Evan,
>>
>> A few replies / questions inline. Somewhat relatedly, I'm also wondering
>>> where this connector should live. I saw that there's already a pubsub
>>> connector in
>>> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
>>> so if flink is willing to host it, perhaps it could live near there?
>>> Alternatively, it could live alongside our client library in
>>> https://github.com/googleapis/java-pubsublite.
>>>
>>
>> For a long time, the community has been thinking of moving (most)
>> connectors out of the repository. Especially now with the new source/sink
>> interface, the need to decouple Flink release cycle and connector release
>> cycle is bigger 

Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-09 Thread Chirag Dewan
 Thanks for the reply Yun.
The key is an Integer type. Do you think there can be hash collisions for 
Integers?
It somehow works on single TM now. No errors for 1m records.But as soon as we 
move to 2 TMs, we get all sort of errors - 'Position Out of Bound', key not in 
Keygroup etc.
This also causes a NPE in the user defined code -
if (valueState != null)    valueState.value() -> This causes Null, so while the 
if check passed, it caused an NPE while reading the value.

Thanks,Chirag
On Tuesday, 8 June, 2021, 08:29:04 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
As far as I know, If you are running a single job, I think all th pods share 
the same state.checkpoints.dir configuration should be as expected, and it is 
not necessary to configuraiton the rocksdb local dir since Flink will chosen a 
default dir.
Regarding the latest exception, I think you might first check the key type used 
and the key type should has a stable hashcode method. 
Best,Yun



 --Original Mail --Sender:Chirag Dewan 
Send Date:Tue Jun 8 18:06:07 2021Recipients:User 
, Yun Gao Subject:Re: Multiple 
Exceptions during Load Test in State Access APIs with RocksDB
 Hi,
Although this looks like a problem to me, I still cant conclude it. 
I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. I 
was hoping that with single TM there will be file write conflicts. But that 
doesn't seem to be the case as still get the:
Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.IllegalArgumentException: Key group 2 is not in 
KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
I have checked that there's no concurrent access on the ValueState.
Any more leads?
Thanks,Chirag

On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan 
 wrote:  
 
  Hi,
I think I got my issue. Would help if someone can confirm it :)
I am using a NFS filesystem for storing my checkpoints and my Flink cluster is 
running on a K8 with 2 TMs and 2 JMs. 
All my pods share the NFS PVC with state.checkpoint.dir and we also missed 
setting the RocksDB local dir.
Does this lead to state corruption?
Thanks,Chirag


On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan 
 wrote:  
 
  Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.
Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
If be able to produce the exception, could you first add some logs to printthe 
value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ?I 
think either object being null would cause NullPointerException here. 
For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?
Best,Yun
[1] https://issues.apache.org/jira/browse/FLINK-18587
Best,Yun


 --Original Mail --Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021Recipients:User 
Subject:Multiple Exceptions during Load Test in State 
Access APIs with RocksDB
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 

I have 2 Task Managers with 2 taskslots and 4 cores each. 

Below is our setup:

 

Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic

  

public class Aggregator_KeyedExpression extendsKeyedProcessFunction {

 

    private ValueStatevalueState;

 

    @Override

    public void open() throws Exception {

ValueStateDescriptor descriptor =

   new ValueStateDescriptor(

   "totalPrize",Integer.class);

 

    valueState =getRuntimeContext().getState(descriptor);

    }

 

@Override

    public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)

   throws Exception {

 

if(valueState.value() == null) {

   valueState.update(0);

    }

    

valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line

    

int sum =valueState.value();



    GameZoneOutputoutput = new GameZoneOutput();

   output.setPlayerId(inEvent.getPlayerId());

   output.setNetPrize(sum);

   outEvents.add(output);

 

    }

 

    @Override

    public void close() throws Exception {

   valueState.clear();

    }

}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.
Another strange thing is that this is observed only in load conditions 

??????????

2021-06-09 Thread 5599
user-zh-unsubscr...@flink.apache.org




----
??: "??"

退订

2021-06-09 Thread 金晓龙
退订