How can I mark the end of the stream with an end message while using the new KafkaSource?

2021-11-09 Thread LIU Xiao
With the legacy FlinkKafkaConsumer, overriding the isEndOfStream method of
DeserializationSchema can solve the problem.
But the new KafkaSource ignores the method (never been called), and it
seems the setUnbounded method only accepts offset or time.


Re:求使用oracle jdk8的flink docker镜像Dockerfile

2021-11-09 Thread 欧阳武林



 我遇到过和你一样的问题,dockerfile 脚本如下,你把jdk的包和dockerfile放到同一个目录下。然后再执行docker 
build命令,就能打包出来有jstack,jps命令的image了。




```


FROM flink

RUN mkdir -p $FLINK_HOME/usrlib

RUN mkdir -p $FLINK_HOME/.kube

COPY jdk1.8.0_301 /usr/lib/jdk1.8.0_301

ENV JAVA_HOME /usr/lib/jdk1.8.0_301

ENV PATH ${JAVA_HOME}/bin:$PATH

COPY ./config $FLINK_HOME/.kube

RUN chown flink:flink $FLINK_HOME/.kube/config

RUN chmod 644 $FLINK_HOME/.kube/config

```















欧阳武林


18896723...@139.com


18896723655







电子名片新出VIP模板啦,快来体验>>




扫一扫,


快速添加名片到手机







The following is the content of the forwarded email
From:"casel.chen" 
To:"user-zh@flink.apache.org" 
Date:2021-11-09 16:49:35
Subject:求使用oracle jdk8的flink docker镜像Dockerfile

查了下flink官方docker image https://github.com/apache/flink-docker  
是基于openjdk的,体积虽然小,但少了很多工具,例如jstack,jps, jstat, jmap等。
当作业出现问题时这些工具可以派上用场。问一下要怎么换成oracle jdk8? 求一份 Dockerfile,谢谢!





Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。

2021-11-09 Thread yidan zhao
此外,按照event
time分区的情况下,迟到数据怎么处理的。如果是streaming情况,window算子,迟到数据是丢弃的。对于flinksql这种从kafka写到hive,只是依靠event
time做分区的情况,迟到数据是什么表现呢。

yidan zhao  于2021年11月10日周三 下午1:03写道:

> 另外,写到hdfs后文件命名为.开头,最近发现部分有..开头的。请问..开头和.开头什么区别呢,是不是..开头是没用了已经。
>
> 比如有检查点ckpt1,ckpt2,...然后失败,重启后,基于ckpt2重启,那么ckpt2之后生成的部分数据文件会被命名为..开头表示废弃,然后重启后重新创建.开头的文件这么写,是吗。
>
> yidan zhao  于2021年11月9日周二 上午10:50写道:
>
>> 关于FlinkSQL写hive,orc格式,性能和稳定性方面有什么建议吗。
>>
>> 比如并行度设置多少合理,目前compact-coordinator并行度定死为1,不可更改应该,compact-operator是60,日常来看compact-operator经常是红色,busy100%。目前问题是偶尔会发现检查点失败,延迟等,导致实际现象是文件没合并,进而inode不足。(我们的inode的quota不足实际是)。
>>
>>
>> 4个task节点,source、compact-coordinator、compact-operator、partition-commiter,分别考虑什么设置并行度呢,仅针对能设置的部分。
>> 比如souce部分我主要考虑数据量,不清楚这个compact-operator的并行主要考虑啥,也是数据量吗?
>>
>>
>> 此外,也没有可能kafka2hdfs和compact分成2个线做,互不影响。
>>
>> Caizhi Weng  于2021年11月5日周五 下午1:35写道:
>>
>>> Hi!
>>>
>>> 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>>>
>>> 正确
>>>
>>> 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>>>
>>> 正确
>>>
>>> 实际部分节点做的是后台IO了事情,是不是反映不到busy情况上
>>>
>>> 是的,busy 的计算方式是通过采样看有多少个线程正在工作。对于 sink 这种线程都在等待后台 io 的节点来说确实 busy 值不会很高。
>>>
>>> yidan zhao  于2021年11月4日周四 下午5:57写道:
>>>
>>> > hi,还想继续问下。这个合并机制,根据文档介绍如下。
>>> > Whether to enable automatic compaction in streaming sink or not. The
>>> data
>>> > will be written to temporary files. After the checkpoint is completed,
>>> the
>>> > temporary files generated by a checkpoint will be compacted. The
>>> temporary
>>> > files are invisible before compaction.
>>> > 看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。
>>> > 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>>> >
>>> > 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>>> >
>>> > 3 另外一个问题:目前看flinksql写hive,streaming情况。从web
>>> >
>>> >
>>> ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。
>>> >
>>> >
>>> 按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。
>>> > 所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。
>>> >
>>>
>>


Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。

2021-11-09 Thread yidan zhao
另外,写到hdfs后文件命名为.开头,最近发现部分有..开头的。请问..开头和.开头什么区别呢,是不是..开头是没用了已经。
比如有检查点ckpt1,ckpt2,...然后失败,重启后,基于ckpt2重启,那么ckpt2之后生成的部分数据文件会被命名为..开头表示废弃,然后重启后重新创建.开头的文件这么写,是吗。

yidan zhao  于2021年11月9日周二 上午10:50写道:

> 关于FlinkSQL写hive,orc格式,性能和稳定性方面有什么建议吗。
>
> 比如并行度设置多少合理,目前compact-coordinator并行度定死为1,不可更改应该,compact-operator是60,日常来看compact-operator经常是红色,busy100%。目前问题是偶尔会发现检查点失败,延迟等,导致实际现象是文件没合并,进而inode不足。(我们的inode的quota不足实际是)。
>
>
> 4个task节点,source、compact-coordinator、compact-operator、partition-commiter,分别考虑什么设置并行度呢,仅针对能设置的部分。
> 比如souce部分我主要考虑数据量,不清楚这个compact-operator的并行主要考虑啥,也是数据量吗?
>
>
> 此外,也没有可能kafka2hdfs和compact分成2个线做,互不影响。
>
> Caizhi Weng  于2021年11月5日周五 下午1:35写道:
>
>> Hi!
>>
>> 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>>
>> 正确
>>
>> 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>>
>> 正确
>>
>> 实际部分节点做的是后台IO了事情,是不是反映不到busy情况上
>>
>> 是的,busy 的计算方式是通过采样看有多少个线程正在工作。对于 sink 这种线程都在等待后台 io 的节点来说确实 busy 值不会很高。
>>
>> yidan zhao  于2021年11月4日周四 下午5:57写道:
>>
>> > hi,还想继续问下。这个合并机制,根据文档介绍如下。
>> > Whether to enable automatic compaction in streaming sink or not. The
>> data
>> > will be written to temporary files. After the checkpoint is completed,
>> the
>> > temporary files generated by a checkpoint will be compacted. The
>> temporary
>> > files are invisible before compaction.
>> > 看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。
>> > 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>> >
>> > 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>> >
>> > 3 另外一个问题:目前看flinksql写hive,streaming情况。从web
>> >
>> >
>> ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。
>> >
>> >
>> 按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。
>> > 所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。
>> >
>>
>


Re: How to express the datatype of sparksql collect_list(named_struct(...))in flinksql?

2021-11-09 Thread JING ZHANG
Hi vtygoss,
I'm a little confused.
The UDF could already work well without defining `DataTypeHint `annotation.
Why do you define `DataTypeHint `annotation before input parameter of
`eval `method?

Best,
JING ZHANG

vtygoss  于2021年11月9日周二 下午8:17写道:

> Hi, JING ZHANG!
>
> Thanks for your many times of help.
>
>
> I already try to use COLLECT(ROW(id, name)) and store the result with type
> String(for POC test).  So I try to define an UDF, and the annotation of
> function eval must be defined as "MULTISET>" as below, otherwise
> exception "..RAW/MAP expected. but MULTISET `EXPR$1` STRING, `EXPR$2` STRING, `EXPR$3` STRING, `EXPR$4` STRING> NOT
> NULL> NOT NULL passed" thrown.
>
>
> And i think this way of UDF's annotation maybe not a convenient way for
> general scenario. I can't define many UDFs for all RowData structures, such
> as Row, Row, Row<.>
>
>
> Is there any way to define the annotation for dynamic RowData structure?
>
> Thanks for your suggestions again.
>
>
> Best Regards!
>
>
> ```
>
> def eval(@DataTypeHint("MULTISET   "vital_sign_id STRING, cdt_vital_sign_index STRING, " +
>   "unit_name STRING, parameter_value STRING, measure_datetime STRING>" +
>   ">") data: JMAP[Row, Integer]): String = {
>   if (data == null || data.size() == 0) {
> return ""
>   }
>   data.keySet().toArray().mkString(",")
> }
>
> ```
>
>
>
>
>
> 在 2021年11月8日 21:26,JING ZHANG 写道:
>
> Hi Vtygoss,
> You could try the following SQL:
> ```
>
> select COLLECT(ROW(id, name)) as info
>
> from table
>
> group by ...;
>
> ```
>
> In the above sql, the result type of `COLLECT(ROW(id, name))` is
> MULTISET.
>
> `CollectAggFunction` would store the data in a MapState. key is element
> type, represent the row value. value is Integer type, represents the count
> of row.
>
>
> If you need to define a UDF which handles the result from `COLLECT(ROW(id,
> name))`, you could use Map as input parameter type.
>
> The following code is a demo. Hope it helps.
>
> tEnv.registerFunction("TestFunc", TestFunc)
> tEnv.sqlQuery("select TestFunc(COLLECT(ROW(id, name))) as info from table 
> group by ...")
>
> 
>
> @SerialVersionUID(1L)
> object TestFunc extends ScalarFunction {
>   def eval(s: java.util.Map[Row, Integer]): String = s.keySet().mkString("\n")
> }
>
> Best regards,
>
> JING ZHANG
>
> vtygoss  于2021年11月8日周一 下午7:00写道:
>
>> Hi, flink community!
>>
>>
>> I am working on migrating data production pipeline from SparkSQL to
>> FlinkSQL(1.12.0). And i meet a problem about MULTISET>.
>>
>>
>> ```
>>
>> Spark SQL
>>
>>
>> select COLLECT_LIST(named_struct('id', id, 'name', name)) as info
>>
>> from table
>>
>> group by ...;
>>
>>
>> ```
>>
>>
>> - 1. how to express and store this data structure in flink?
>>
>> I tried to express by MULTISET> in FlinkSQL.
>> But it seems that ORC / JSON / AVRO format cann't store this type.
>>
>> - 2.  How to read MULTISET> in FlinkSQL?
>>
>> If i need to define a function, which type should be briged to for
>> MultiSet>?
>>
>>
>> Is there any other way more convenient to solute this problem?
>>
>> Thanks very much for your any suggestions or replies.
>>
>>
>> Best Regards!
>>
>


flink 1.12.1 写入hdfs 存在inProgress文件

2021-11-09 Thread jie han
Hi!
 我们现在使用flink1.12.1版本的flink, 使用StreamingFileSink写入hdfs, 发现存在以下两个问题点。
  1: 官方文档明确提示出: 任务关闭未采用savepoint/ checkpoint 重启 会导致 最后一批的文件无法复原的状态,保持在
inProgress 状态
  例如: 路径 /data/user/test 中存在三个文件
  part-0-0
  part-0-1
  .part-0-2.inprogress.952eb958-dac9-4f2c-b92f-9084ed536a1c
   2: 任务再次启动产生的Buckets 的 永远从 0开始往上递增,上一次任务产生了一些文件。
 同一个任务再次启动后文件:
  part-0-0
  part-0-1
  .part-0-2.inprogress.952eb958-dac9-4f2c-b92f-9084ed536a1c
  .part-0-0.inprogress.0e2f234b-042d-4232-a5f7-c980f04ca82d文件
  由于part-0-0文件已经存在,.part-0-0.inprogress.0e2f234b-042d-4232-a5f7-c980f04ca82d
始终无法变更成part-0-0。


Re: Getting mini-cluster logs when debugging pyflink from IDE

2021-11-09 Thread Dian Fu
The logging directory could be configured via one the following way:
- environment variable: FLINK_LOG_DIR
- configuration 'env.log.dir' in
$PYTHON_INSTALLATION_DIR/site-packages/pyflink/conf/flink-conf.yaml

You could refer to [1] for more details.

PS: This is only available for 1.13.3+ and 1.14.0+.

Regards,
Dian

[1]
https://github.com/apache/flink/blob/master/flink-python/pyflink/pyflink_gateway_server.py#L124

On Tue, Nov 9, 2021 at 12:52 PM Роман VVvKamper 
wrote:

> Thanks for help, I’ll look into it :)
>
> Two more questions:
>
> Is there a way to configure this path? For example to write logs to a file
> in the working dir?
>
> And is there a way to redirect logst from file to stdout?
>
> On 9 Nov 2021, at 09:00, Dian Fu  wrote:
>
> Hi,
>
> The logs should appear in the log file of the TaskManger and you could
> find it under directory $PYTHON_INSTALLATION_DIR/site-packages/pyflink/log/
>
> Regards,
> Dian
>
> On Mon, Nov 8, 2021 at 10:53 PM Роман VVvKamper 
> wrote:
>
>> Hello,
>>
>> I'm trying to debug flink and pyflink job from IDE using mini cluster
>> (local mode).
>>
>> When i doing it in the java flink, everything works like a charm - i can
>> see flink mini-cluster logs in the console.
>>
>> But when i run pyflink job in local mode (through the IDE of by simply
>> calling python src/pipeline.py) i can see only logs from python code in the
>> console but not a mini-cluster logs.
>>
>> Is there a way to enable mini-cluster logs from the python local mode?
>> Maybe i can contribute some code for it if you help me where to start.
>
>
>


Re: flink 1.13.2 在 Java/Scala 程序中调用 Python UDF函数,通过yarn-application执行,yarn集群的每台机器都需要安装pyflink?

2021-11-09 Thread Dian Fu
FYI

On Wed, Nov 10, 2021 at 9:31 AM Dian Fu  wrote:

> 也可以通过以下方式:
> - Python libraries [1]: 把PyFlink以及其他相关依赖打包,作为依赖指定
> - Python archieves [2]: 构建Python虚拟环境,并在里面安装PyFlink以及其他依赖,作为依赖指定
>
>
> 但是上述方式相对于直接在集群上安装来说,提交作业的时候,Flink内部需要把相关文件分发到集群节点上,如果文件比较大,有一点的overhead,会降低启动速度。
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#python-libraries
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#archives
>
> On Tue, Nov 9, 2021 at 2:01 PM Asahi Lee <978466...@qq.com.invalid> wrote:
>
>> HI!
>>   我现在使用flink 1.13.2,通过java table api开发应用,其中要使用python
>> udf函数,最终通过yarn-application方式提交;那我需要在yarn集群的机器上都安装pyflink?还是有其他方案?
>
>


Re: flink 1.13.2 在 Java/Scala 程序中调用 Python UDF函数,通过yarn-application执行,yarn集群的每台机器都需要安装pyflink?

2021-11-09 Thread Dian Fu
也可以通过以下方式:
- Python libraries [1]: 把PyFlink以及其他相关依赖打包,作为依赖指定
- Python archieves [2]: 构建Python虚拟环境,并在里面安装PyFlink以及其他依赖,作为依赖指定

但是上述方式相对于直接在集群上安装来说,提交作业的时候,Flink内部需要把相关文件分发到集群节点上,如果文件比较大,有一点的overhead,会降低启动速度。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#python-libraries
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#archives

On Tue, Nov 9, 2021 at 2:01 PM Asahi Lee <978466...@qq.com.invalid> wrote:

> HI!
>   我现在使用flink 1.13.2,通过java table api开发应用,其中要使用python
> udf函数,最终通过yarn-application方式提交;那我需要在yarn集群的机器上都安装pyflink?还是有其他方案?


Re: select records using JDBC with parameters

2021-11-09 Thread Caizhi Weng
Hi!

It is very likely that versions of your Flink client and Flink standalone
cluster do not match. SubtaskStateMapper.DISCARD_EXTRA_STATE is removed
since Flink 1.14 so please make sure that your Flink client version is also
1.14.

Sigalit Eliazov  于2021年11月10日周三 上午5:46写道:

> Hello
>
> i am creating new pipeline which
>
>1. receives info from kafka (mainly the key)
>
>2. with this key select information from a D
>
>3. writes to kafka the results
>
> Flink is running has a standalone cluster
>
> I am failing on the pipeline deployment when activating step 2 with the 
> following error
>
> [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to 
> deserialize JobGraph
>
> Caused by: java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE
>
> the code:
> PCollection>> input = 
> pipeline.apply("readFromKafka",
> KafkaTransform.readStrFromKafka(
> pipelineUtil.getBootstrapServers(), topic))
> .apply("window", Window.>into(new GlobalWindows()) 
> // Everything into global window.
> 
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
> .discardingFiredPanes())
> .apply("S", GroupByKey.create());
> PCollection output = input.apply("read from db", JdbcIO. Iterable>, AnalyticsResult>readAll()
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
> "org.postgresql.Driver", PipelineUtil.getDbInfo())
> .withUsername("123")
> .withPassword(PipelineUtil.readCredentials()))
> .withQuery("select *  from a where id = ? order by insert_timestamp 
> limit 5")
> .withParameterSetter(new JdbcIO.PreparedStatementSetter Iterable>>() {
> @Override
> public void setParameters(KV> element,
>   PreparedStatement preparedStatement) 
> throws Exception {
> String nfcId = element.getKey();
> preparedStatement.setString(1, nfcId);
> }
> })
> .withRowMapper(new JdbcIO.RowMapper() {
> public AnalyticsResult mapRow(ResultSet resultSet) throws 
> Exception {
> MyObject obj = new MyObject(
> resultSet.getString("name"),
> );
>
> return obj;
> }
> }).withCoder(SerializableCoder.of(AnalyticsResult.class)));
>
>
> any ideas?
>
> Thanks a lot
>
> S
>
>


Re: Pyflink PyPi build - scala 2.12 compatibility

2021-11-09 Thread Dian Fu
Hi Kamil,

You are right that it comes with JAR packages of scala 2.11 in the PyFlink
package as it has to select one version of JARs to bundle, either 2.11 or
2.12. Whether it works with scala 2.12 depends on how you submit your job.
- If you execute the job locally, then it will use the JARs bundled in the
PyFlink installation by default, that's scala 2.11. However, you could set
the environment variable 'FLINK_HOME' [1] to the directory of a custom
Flink distribution of 2.12 if you want to work with scala 2.12.
- If you execute the job remotely, e.g using `flink run` to submit the job
to a remote session cluster, YARN cluster, etc. Then it depends on the
Flink distribution from which the `flink run` command refers to. If you
want to work with scala 2.12, it should refer to a custom Flink
distribution of 2.12.

Regards,
Dian

[1]
https://github.com/apache/flink/blob/master/flink-python/pyflink/find_flink_home.py#L46

On Wed, Nov 10, 2021 at 3:12 AM Kamil ty  wrote:

> Hello,
>
> Just wanted to verify if the default build of pyflink available from PyPi
> is compatible with flink - scala version 2.12. I have noticed that the PyPi
> pyflink version comes with apache-flink-libraries targeted for scala 2.11
> only and I was wondering if this might be the cause of some issues that I'm
> running into.
>
> Kind regards
> Kamil
>


Re: Access to GlobalJobParameters From DynamicTableSourceFactory

2021-11-09 Thread Caizhi Weng
Hi!

You can't get GLOBAL_JOB_PARAMETERS
from DynamicTableSourceFactory#createDynamicTableSource as far as I know,
because configuration in that context is from table config, which only
contains configuration on table API level.

Could you tell us more about your use case? There are many other ways to
configure a table source for example setting the table configs or passing
table options (the options after the WITH clause in DDL).

Krzysztof Chmielewski  于2021年11月10日周三
上午6:10写道:

> Hi,
> Well no, because ReadableContext does not have  getConfiguration method ;)
>
> So like I wrote, I would like to get the  GLOBAL_JOB_PARAMETERS from
> DynamicTableSourceFactory interface implementations.
> I'm talking about  DynamicTableSourceFactory:: createDynamicTableSource
> method. It seems that I have access only to Table definition fields.
>
> Regards,
> Krzysztof Chmielewski
>
> wt., 9 lis 2021 o 19:02 Francesco Guardiani 
> napisał(a):
>
>> Have you tried this?
>>
>>
>> context.getConfiguration().get(org.apache.flink.configuration.PipelineOptions.GLOBAL_JOB_PARAMETERS)
>>
>>
>>
>> On Tue, Nov 9, 2021 at 3:59 PM Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> wrote:
>>
>>> Hi,
>>> Is there a way to access GlobalJobParameters registered as
>>> env.getConfig().setGlobalJobParameters(parameters);
>>>
>>> from DynamicTableSourceFactory implementation?
>>> To be more specific
>>> from DynamicTableSourceFactory::createDynamicTableSource method.
>>>
>>> The Context parameter of createDynamicTableSource has access only
>>> to ReadableConfig object which does not have GlobalParameters.
>>>
>>> Cheers,
>>> Krzysztof Chmielewski
>>>
>>


Re: Access to GlobalJobParameters From DynamicTableSourceFactory

2021-11-09 Thread Krzysztof Chmielewski
Hi,
Well no, because ReadableContext does not have  getConfiguration method ;)

So like I wrote, I would like to get the  GLOBAL_JOB_PARAMETERS from
DynamicTableSourceFactory interface implementations.
I'm talking about  DynamicTableSourceFactory:: createDynamicTableSource
method. It seems that I have access only to Table definition fields.

Regards,
Krzysztof Chmielewski

wt., 9 lis 2021 o 19:02 Francesco Guardiani 
napisał(a):

> Have you tried this?
>
>
> context.getConfiguration().get(org.apache.flink.configuration.PipelineOptions.GLOBAL_JOB_PARAMETERS)
>
>
>
> On Tue, Nov 9, 2021 at 3:59 PM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Hi,
>> Is there a way to access GlobalJobParameters registered as
>> env.getConfig().setGlobalJobParameters(parameters);
>>
>> from DynamicTableSourceFactory implementation?
>> To be more specific
>> from DynamicTableSourceFactory::createDynamicTableSource method.
>>
>> The Context parameter of createDynamicTableSource has access only
>> to ReadableConfig object which does not have GlobalParameters.
>>
>> Cheers,
>> Krzysztof Chmielewski
>>
>


select records using JDBC with parameters

2021-11-09 Thread Sigalit Eliazov
Hello

i am creating new pipeline which

   1. receives info from kafka (mainly the key)

   2. with this key select information from a D

   3. writes to kafka the results

Flink is running has a standalone cluster

I am failing on the pipeline deployment when activating step 2 with
the following error

[org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to
deserialize JobGraph

Caused by: java.lang.IllegalArgumentException: No enum constant
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE

the code:
PCollection>> input =
pipeline.apply("readFromKafka",
KafkaTransform.readStrFromKafka(
pipelineUtil.getBootstrapServers(), topic))
.apply("window", Window.>into(new
GlobalWindows()) // Everything into global window.

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply("S", GroupByKey.create());
PCollection output = input.apply("read from db",
JdbcIO.>, AnalyticsResult>readAll()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", PipelineUtil.getDbInfo())
.withUsername("123")
.withPassword(PipelineUtil.readCredentials()))
.withQuery("select *  from a where id = ? order by
insert_timestamp limit 5")
.withParameterSetter(new
JdbcIO.PreparedStatementSetter>>() {
@Override
public void setParameters(KV> element,
  PreparedStatement
preparedStatement) throws Exception {
String nfcId = element.getKey();
preparedStatement.setString(1, nfcId);
}
})
.withRowMapper(new JdbcIO.RowMapper() {
public AnalyticsResult mapRow(ResultSet resultSet) throws
Exception {
MyObject obj = new MyObject(
resultSet.getString("name"),
);

return obj;
}
}).withCoder(SerializableCoder.of(AnalyticsResult.class)));


any ideas?

Thanks a lot

S


Pyflink PyPi build - scala 2.12 compatibility

2021-11-09 Thread Kamil ty
Hello,

Just wanted to verify if the default build of pyflink available from PyPi
is compatible with flink - scala version 2.12. I have noticed that the PyPi
pyflink version comes with apache-flink-libraries targeted for scala 2.11
only and I was wondering if this might be the cause of some issues that I'm
running into.

Kind regards
Kamil


Re: Access to GlobalJobParameters From DynamicTableSourceFactory

2021-11-09 Thread Francesco Guardiani
Have you tried this?

context.getConfiguration().get(org.apache.flink.configuration.PipelineOptions.GLOBAL_JOB_PARAMETERS)



On Tue, Nov 9, 2021 at 3:59 PM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi,
> Is there a way to access GlobalJobParameters registered as
> env.getConfig().setGlobalJobParameters(parameters);
>
> from DynamicTableSourceFactory implementation?
> To be more specific
> from DynamicTableSourceFactory::createDynamicTableSource method.
>
> The Context parameter of createDynamicTableSource has access only
> to ReadableConfig object which does not have GlobalParameters.
>
> Cheers,
> Krzysztof Chmielewski
>


JVM cluster not firing event time window

2021-11-09 Thread Carlos Downey
Hello,

Recently, I've decided to migrate one of my toy projects to Flink 1.14.0
and I realized that JVM cluster behaviour changed. It no longer respects
event time. On Flink 1.12.5 I didn't experience any issues. I tried some
debugging and it seems that InternalTimerServiceImpl's watermark is not
incremented at all. I don't see any issues with my code when running it on
a cluster.

I have provided a code sample that shows the issue. Just put numbers 1 to 10 to
a local Kafka cluster topic numbers. Try compiling it with Flink 1.12.5 /
1.13.3 / 1.14.0 (note: in recent Flink versions KafkaRecordDeserializer
 became KafkaRecordDeserializationSchema).

object Example extends App {
  import org.apache.flink.api.scala._
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val kafkaSource = KafkaSource.builder[Int]()
.setTopics("numbers")
.setBootstrapServers("localhost:9092")
.setGroupId("flink-job")
.setDeserializer(new KafkaRecordDeserializationSchema[Int] {
  override def deserialize(record: ConsumerRecord[Array[Byte],
Array[Byte]], out: Collector[Int]): Unit = {
out.collect(new String(record.value()).toInt)
  }

  override def getProducedType: TypeInformation[Int] =
TypeInformation.of(classOf[Int])
})
.build()

  env.fromSource(kafkaSource, new WatermarkStrategy[Int] {
  override def createWatermarkGenerator(context:
WatermarkGeneratorSupplier.Context): WatermarkGenerator[Int] = new
AscendingTimestampsWatermarks[Int]

  override def createTimestampAssigner(context:
TimestampAssignerSupplier.Context): TimestampAssigner[Int] = {
(element: Int, _: Long) => element
  }
}, "kafka-source")
.map(Integer.valueOf(_))
.keyBy(_ => 1)
.window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
.process(new ProcessWindowFunction[Integer, Int, Int, TimeWindow] {
  override def process(key: Int, context: Context, elements:
Iterable[Integer], out: Collector[Int]): Unit = {
out.collect(elements.last)
  }
})
.print()
  env.execute()
}

I see no output in Flink 1.13.3 and 1.14.0. The issue doesn't happen with
`fromElements` source -> I concluded that for finite sources all windows
are eventually fired. I wonder what could be causing this - maybe I just
missed some mandatory configuration?


Re: Providing files while application mode deployment

2021-11-09 Thread Piotr Nowojski
Hi Vasily,

Unfortunately no, I don't think there is such an option in your case. With
per job mode, you could try to use the Distributed Cache, it should be
working in streaming as well [1], but this doesn't work in the application
mode, as in that case no code is executed on the JobMaster [2]

Two workarounds that I could propose, that I know are not perfect is to:
- bundle the configuration file in the jar
- pass the entire configuration as a parameter to the job though some json,
or base64 encoded parameter.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/dataset/overview/#distributed-cache
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/#overview-and-reference-architecture

wt., 9 lis 2021 o 14:14 Vasily Melnik 
napisał(a):

> Hi all.
>
> While running Flink jobs in application mode on YARN and Kuber, we need to
> provide some configuration files to main class. Is there any option on
> Flink CLI  to copy local files on cluster without manually copying on DFS
> or in docker image, something like *--files* option in spark-submit?
>
>
>


Access to GlobalJobParameters From DynamicTableSourceFactory

2021-11-09 Thread Krzysztof Chmielewski
Hi,
Is there a way to access GlobalJobParameters registered as
env.getConfig().setGlobalJobParameters(parameters);

from DynamicTableSourceFactory implementation?
To be more specific
from DynamicTableSourceFactory::createDynamicTableSource method.

The Context parameter of createDynamicTableSource has access only
to ReadableConfig object which does not have GlobalParameters.

Cheers,
Krzysztof Chmielewski


Azure blob storage credential configuration in flink on ververica is giving java.lang.ClassNotFoundException

2021-11-09 Thread Samir Vasani


I am working on accessing azure blob storage through flink pipeline.

As per flink documentation
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/
there
are two approaches to implement this.

1)fs.azure.account.key..blob.core.windows.net:
 .

I implemented this approach but hardcoding of access keys isnot suggestible
way as per our organization security strategy. So this approach is not
helpful.

2)fs.azure.account.keyprovider..blob.core.windows.net:
org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider

**a)**We are using this approach for saving checkpoint and savepoints of
the running job on azure blob storage say storage1 . Means this approach
(or key value pair combination) has already been in use.

**b)**Now we have a requirement that we want to save our csv/text/xml file
on a differen blob storage say storage2.

For accessing this blob storage account i need to provide access key and
this needs to be accessible via configuration same way that i mentioned in
point a.

For that i created one my application specific class whose internal logic
(except enviroment variable) is same as EnvironmentVariableKeyProvider.

import 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration;import
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProvider;import
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException;


public class MyAppEnvironmentVariableKeyProvider  implements KeyProvider  {

   public static final String AZURE_STORAGE_KEY_ENV_VARIABLE =
"AZURE_STORAGE_KEY_MYAPP";


@Override
public String getStorageAccountKey(final String s, final
Configuration configuration)
throws KeyProviderException {
// Currently hardcoding
//String azureStorageKey = "abcdefghijk";
String azureStorageKey = System.getenv(AZURE_STORAGE_KEY_ENV_VARIABLE);

if (azureStorageKey != null) {
return azureStorageKey;
} else {
throw new KeyProviderException(
"Unable to retrieve Azure storage key from environment. \""
+ AZURE_STORAGE_KEY_ENV_VARIABLE
+ "\" not set.");
}
}
}

I declared the configuration in *deployment.ym*l as below

flinkConfiguration:

fs.azure.account.keyprovider.storage1.blob.core.windows.net:
org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider
fs.azure.account.keyprovider.storage2.blob.core.windows.net:
>- com.myapp.MyAppEnvironmentVariableKeyProvider

//many other configuraiton exists here but not needed for this
problem statement
  kubernetes:
pods:
  affinity: null
  annotations:
prometheus.io/port: '9249'
prometheus.io/scrape: 'true'
  envVars:
- name: AZURE_STORAGE_KEY
  valueFrom:
secretKeyRef:
  key: azure.accesskey
  name: my-storage-secret
- name: AZURE_STORAGE_KEY_MYAPP
  value: >-
abcdefgh
  valueFrom: null

Now when my application is trying to access this
fs.azure.account.keyprovider.storage2.blob.core.windows.net propery it is
giving me below error.

org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureException:
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException:
Unable to load key provider class.
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1086)
~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:538)
~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1358)
~[?:?]
at 
org.apache.flink.fs.azurefs.AbstractAzureFSFactory.createInitializedAzureFS(AbstractAzureFSFactory.java:88)
~[?:?]
at 
org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79)
~[?:?]
at 
org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:505)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:406)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:214)
~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Dongwon Kim
Hi Fabian,

Can you maybe share more about the setup and how you use the AsyncFunction
> with
> the Kafka client?

Oops, I didn't mention in the reply to David that the kafka producer has
nothing to do with the AsyncFunction!
I interact with Redis and a Spring boot app. in the AsyncFunction, not
Kafka.
My pipeline looks like "KafkaSource -> AsyncFunction -> Window ->
(KafkaSink1, KafkaSink2)" and the kafka producer is the last one.

As David already pointed out it could be indeed a Kafka bug but it could
> also
> mean that your defined async function leaks direct memory by not freeing
> some
> resources.

The problem seems to be the KafkaSink2 which I recently added to the
pipeline in order to write large records (~10MB) for a debugging purpose.
I just launched the pipeline without KafkaSink2 to see whether or not my
conjecture is right or wrong.
If so, I'd rather give more direct memory to each task manager to avoid
this problem.

We can definitely improve the metrics for the AsyncFunction and expose the
> current queue size as a followup.

Really looking forward to it. I spent many hours debugging AsyncFunction
without metrics. It would be great to have how many records are timed out
as a metric as well.

Thanks,

Dongwon


On Tue, Nov 9, 2021 at 7:55 PM Fabian Paul  wrote:

> Hi Dongwan,
>
> Can you maybe share more about the setup and how you use the AsyncFunction
> with
> the Kafka client?
>
> As David already pointed out it could be indeed a Kafka bug but it could
> also
> mean that your defined async function leaks direct memory by not freeing
> some
> resources.
>
> We can definitely improve the metrics for the AsyncFunction and expose the
> current queue size as a followup.
>
> Best,
> Fabian


Providing files while application mode deployment

2021-11-09 Thread Vasily Melnik
Hi all.

While running Flink jobs in application mode on YARN and Kuber, we need to
provide some configuration files to main class. Is there any option on
Flink CLI  to copy local files on cluster without manually copying on DFS
or in docker image, something like *--files* option in spark-submit?


Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Dongwon Kim
Hi Jun,

Did you override AsyncFunction#timeout()?  If so, did you call
> resultFuture.complete()/completeExceptionally() in your override?  Not
> calling them can result in checkpoint timeout.

No, I've only called resultFuture.complete() in AsyncFunction.asyncInvoke()
and didn't know much about  AsyncFunction.timeout(). However, looking at
the default implementation of AsyncFunction.timeout(), I'd rather get the
timeout exception as my streaming pipeline is going to be fail-fast which
is what I prefer the most.

I think I understand what you are concerned about but, as I wrote in the
reply to David, the problem seems to be the kafka sink which I recently
added to the pipeline in order to write large records (~10MB) for the
debugging purpose.

Anyway thanks a lot for letting me know the possibility of overriding
AsyncFunction.timeout().

Best,

Dongwon

On Tue, Nov 9, 2021 at 5:53 PM Jun Qin  wrote:

> Hi Dongwon
>
> Did you override AsyncFunction#timeout()?  If so, did you call
> resultFuture.complete()/completeExceptionally() in your override?  Not
> calling them can result in checkpoint timeout.
>
> Thanks
> Jun
>
>
> On Nov 9, 2021, at 7:37 AM, Dongwon Kim  wrote:
>
> Hi David,
>
> There are currently no metrics for the async work-queue size (you should
>> be able to see the queue stats with debug logs enabled though [1]).
>
> Thanks for the input but scraping DEBUG messages into, for example,
> ElasticSearch for monitoring on Grafana is not possible in my current
> environment.
> I just defined two counters in RichAsyncFunction for tracking # sent
> requests and # finished/failed requests, respectively, and used the two
> counters to calculate the inflight requests from Prometheus.
>
> As far as I can tell from looking at the code, the async operator is able
>> to checkpoint even if the work-queue is exhausted.
>
> Oh, I didn't know that! As you pointed out and I'm going to explain below,
> the async operator might not be the source of the problem.
>
> I just hit the same situation and found that
> - # of inflight records are zero when the backpressure is getting high
> - A taskmanager complains the following error message around the time when
> the backpressure is getting high (all the others don't do):
>
>> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread
>>   [] - Uncaught exception in thread
>> 'kafka-producer-network-thread | producer-8':
>> java.lang.OutOfMemoryError: Direct buffer memory
>> at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
>> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
>> ~[?:?]
>> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
>> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?]
>> at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?]
>> at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?]
>> at
>> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493) ~[?:?]
>> at java.nio.channels.SocketChannel.write(SocketChannel.java:507)
>> ~[?:?]
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at java.lang.Thread.run(Thread.java:829) [?:?]
>>
>
> Can it be the reason why my pipeline is stalled and ends up with the
> checkout timeout? I guess all the upstream tasks might fail to send data to
> the failed kafka producer and records are stacking up in buffers, which
> could result in the back-pressure. If so, is there no mechanism in Flink to
> detect such an error and send it to the job manager for debugging purposes?
>
> Best,
>
> Dongwon
>
>
> On Mon, Nov 8, 2021 at 9:21 PM David Morávek  wrote:
>
>> Hi Dongwon,
>>
>> There are currently no metrics for the 

Re: How to express the datatype of sparksql collect_list(named_struct(...))in flinksql?

2021-11-09 Thread vtygoss
Hi, JING ZHANG!

Thanks for your many times of help. 


I already try to use COLLECT(ROW(id, name)) and store the result with type 
String(for POC test).  So I try to define an UDF, and the annotation of 
function eval must be defined as "MULTISET>" as below, otherwise 
exception "..RAW/MAP expected. but MULTISET NOT NULL> 
NOT NULL passed" thrown.


And i think this way of UDF's annotation maybe not a convenient way for general 
scenario. I can't define many UDFs for all RowData structures, such as Row, 
Row, Row<.>


Is there any way to define the annotation for dynamic RowData structure? 
Thanks for your suggestions again.


Best Regards!


```
def eval(@DataTypeHint("MULTISET" +
 ">") data: JMAP[Row, Integer]): String = {
 if (data == null || data.size() == 0) {
 return ""
 }
 data.keySet().toArray().mkString(",")
}
```






在 2021年11月8日 21:26,JING ZHANG 写道:


Hi Vtygoss,
You could try the following SQL:
```
select COLLECT(ROW(id, name)) as info
from table 
group by ...;
```
In the above sql, the result type of `COLLECT(ROW(id, name))` is MULTISET. 
`CollectAggFunction` would store the data in a MapState. key is element type, 
represent the row value. value is Integer type, represents the count of row.


If you need to define a UDF which handles the result from `COLLECT(ROW(id, 
name))`, you could use Map as input parameter type.
The following code is a demo. Hope it helps.
tEnv.registerFunction("TestFunc", TestFunc)
tEnv.sqlQuery("select TestFunc(COLLECT(ROW(id, name))) as info from table group 
by ...")

@SerialVersionUID(1L)
object TestFunc extends ScalarFunction {
 def eval(s: java.util.Map[Row, Integer]): String = s.keySet().mkString("\n")
}
Best regards,
JING ZHANG


vtygoss  于2021年11月8日周一 下午7:00写道:

Hi, flink community!


I am working on migrating data production pipeline from SparkSQL to 
FlinkSQL(1.12.0). And i meet a problem about MULTISET>.


```
Spark SQL


select COLLECT_LIST(named_struct('id', id, 'name', name)) as info
from table 
group by ...;


``` 


- 1. how to express and store this data structure in flink? 
I tried to express by MULTISET> in FlinkSQL. But it 
seems that ORC / JSON / AVRO format cann't store this type. 
- 2.  How to read MULTISET> in FlinkSQL?  
If i need to define a function, which type should be briged to for 
MultiSet>?  


Is there any other way more convenient to solute this problem?   
Thanks very much for your any suggestions or replies. 


Best Regards!

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Piotr Nowojski
Hi All,

to me it looks like something deadlocked, maybe due to this OOM error from
Kafka, preventing a Task from making any progress. To confirm Dongwan you
could collecte stack traces while the job is in such a blocked state.
Deadlocked Kafka could easily explain those symptoms and it would be
visible as an extreme back pressure. Another thing to look at would be if
the job is making any progress or not at all (via for example
numRecordsIn/numRecordsOut metric [1]).

A couple of clarifications.

> What I suspect is the capacity of the asynchronous operation because
limiting the value can cause back-pressure once the capacity is exhausted
[1].
> Although I could increase the value (...)

If you want to decrease the impact of a backpressure, you should decrease
the capacity. Not increase it. The more in-flight records in the system,
the more records need to be processed/persisted in aligned/unaligned
checkpoints.

> As far as I can tell from looking at the code, the async operator is able
to checkpoint even if the work-queue is exhausted.

Yes and no. If work-queue is full, `AsyncWaitOperator` can be snapshoted,
but it can not be blocked inside the `AsyncWaitOperator#processElement`
method. For checkpoint to be executed, `AsyncWaitrOperator` must finish
processing the current record and return execution to the task thread. If
the work-queue is full, `AsyncWaitOperator` will block inside the
`AsyncWaitOperator#addToWorkQueue` method until the work-queue will have
capacity to accept this new element. If what I suspect is happening here is
true, and the job is deadlocked via this Kafka issue, `AsyncWaitOperator`
will be blocked indefinitely in this method.

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.14/docs/ops/metrics/#io



wt., 9 lis 2021 o 11:55 Fabian Paul  napisał(a):

> Hi Dongwan,
>
> Can you maybe share more about the setup and how you use the AsyncFunction
> with
> the Kafka client?
>
> As David already pointed out it could be indeed a Kafka bug but it could
> also
> mean that your defined async function leaks direct memory by not freeing
> some
> resources.
>
> We can definitely improve the metrics for the AsyncFunction and expose the
> current queue size as a followup.
>
> Best,
> Fabian


Re: Upgrade from 1.13.1 to 1.13.2/1.13.3 failing

2021-11-09 Thread Dawid Wysakowicz
Hey Sweta,

Sorry I did not get back to you earlier.

Could you explain how do you do the upgrade? Do you try to upgrade your
cluster through HA services (e.g. zookeeper)? Meaning you bring down the
1.13.1 cluster down and start a 1.13.2/3 cluster which you intend to
pick up the job automatically along with the latest checkpoint? Am I
guessing correct? As far as I can tell we do not support such a way of
upgrading.

The way we support upgrades is via a savepoint/checkpoint. I'd suggest
to either take a savepoint on 1.13.1 and restore[1] the job on 1.13.2
cluster or use an externalized checkpoint created from 1.13.1.

Best,

Dawid

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint

On 22/10/2021 16:39, Chesnay Schepler wrote:
> The only suggestion I can offer is to take a savepoint with 1.13.1 and
> try to restore from that.
>
> We will investigate the problem in
> https://issues.apache.org/jira/browse/FLINK-24621; currently we don't
> know why you are experiencing this issue.
>
> On 22/10/2021 16:02, Sweta Kalakuntla wrote:
>> Hi,
>>
>> We are seeing error while upgrading minor versions from 1.13.1 to
>> 1.13.2. JobManager is unable to recover the checkpoint state. What
>> would be the solution to this issue?
>>
>> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
>> checkpoint 2844 from state handle under
>> checkpointID-0002844. This indicates that the retrieved
>> state handle is broken. Try cleaning the state handle store.
>> at
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:309)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:122)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> ~[?:?]
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> ~[?:?]
>> at java.lang.Thread.run(Unknown Source) ~[?:?]
>> Caused by: java.io.InvalidClassException:
>> org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor;
>> local class incompatible: stream classdesc serialVersionUID =
>> -5544173933105855751, local class serialVersionUID = 1
>> at java.io.ObjectStreamClass.initNonProxy(Unknown Source) ~[?:?]
>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]
>>
>>
>>
>> Thank you,
>>
>> Sweta K
>>
>>  
>>
>>
>



Re: A savepoint was created but the corresponding job didn't terminate successfully.

2021-11-09 Thread Piotr Nowojski
Hi Dongwon,

Thanks for reporting the issue, I've created a ticket for it [1] and we
will analyse and try to fix it soon. In the meantime it should be safe for
you to ignore this problem. If this failure happens only rarely, you can
always retry stop-with-savepoint command and there should be no visible
side effects for you.

Piotrek


[1] https://issues.apache.org/jira/browse/FLINK-24846

wt., 9 lis 2021 o 03:55 Dongwon Kim  napisał(a):

> Hi community,
>
> I failed to stop a job with savepoint with the following message:
>
>> Inconsistent execution state after stopping with savepoint. At least one
>> execution is still in one of the following states: FAILED, CANCELED. A
>> global fail-over is triggered to recover the job
>> 452594f3ec5797f399e07f95c884a44b.
>>
>
> The job manager said
>
>>  A savepoint was created at
>> hdfs://mobdata-flink-hdfs/driving-habits/svpts/savepoint-452594-f60305755d0e
>> but the corresponding job 452594f3ec5797f399e07f95c884a44b didn't terminate
>> successfully.
>
> while complaining about
>
>> Mailbox is in state QUIESCED, but is required to be in state OPEN for put
>> operations.
>>
>
> Is it okay to ignore this kind of error?
>
> Please see the attached files for the detailed context.
>
> FYI,
> - I used the latest 1.14.0
> - I started the job with "$FLINK_HOME"/bin/flink run --target yarn-per-job
> - I couldn't reproduce the exception using the same jar so I might not
> able to provide DUBUG messages
>
> Best,
>
> Dongwon
>
>


Re: Beginner: guidance on long term event stream persistence and replaying

2021-11-09 Thread Piotr Nowojski
Hi Simon,

>From the top of my head I do not see a reason why this shouldn't work in
Flink. I'm not sure what your question is here.

For reading both from the FileSource and Kafka at the same time you might
want to take a look at the Hybrid Source [1]. Apart from that there are
FileSource/FileSink and KafaSource that I presume you have already found :)

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/

pon., 8 lis 2021 o 22:22 Simon Paradis  napisał(a):

> Hi,
>
> We have an event processing pipeline that populates various reports from
> different Kafka topics and would like to centralize processing in Flink. My
> team is new to Flink but we did some prototyping using Kinesis.
>
> To enable new reporting based on past events, we'd like the ability to
> replay those Kafka events when creating new reports; a capability we don't
> have today.
>
> We ingest the same topics from many Kafka clusters in different
> datacenters and it is not practical to have enough retention on these Kafka
> topics for technical reasons and also practical issues around GDPR
> compliance and Kafka's immutability (it's not an issue today because our
> Kafka retention is short).
>
> So we'd like to archive events into files that we push to AWS S3 along
> with some metadata to help implement GDPR more efficiently. I've looked
> into Avro object container files and it seems like it would work for us.
>
> I was thinking of having a dedicated Flink job reading and archiving to S3
> and somehow plug these S3 files back into a FileSource when a replay is
> needed to backfill new reporting views. S3 would contain Avro container
> files with a pattern like
>
> sourceDC__topicName__MMDDHHMM__NN.data
>
> where files are rolled over every hour or so and "rekeyed" into NN slots
> as per the event key to retain logical order while having reasonable file
> sizes.
>
> I presume someone has already done something similar. Any pointer would be
> great!
>
> --
> Simon Paradis
> paradissi...@gmail.com
>


Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Fabian Paul
Hi Dongwan,

Can you maybe share more about the setup and how you use the AsyncFunction with
the Kafka client?

As David already pointed out it could be indeed a Kafka bug but it could also
mean that your defined async function leaks direct memory by not freeing some
resources.

We can definitely improve the metrics for the AsyncFunction and expose the
current queue size as a followup.

Best,
Fabian

Re: Dependency injection for TypeSerializer?

2021-11-09 Thread Krzysztof Chmielewski
Hi,
In my past project I was able to use Spring as a DI provider for Flink
Jobs. It actually saves me a lot of hassle while writing/composing jobs and
process functions.
I was able to use all Spring's Bean annotations along with properties files
managed by Spring as it would be a "normal" spring app. The dependencies
that I was injecting via Spring were not serialized/deserialized by Flink
which actually was something that I wanted to achieved. In some cases it is
very hard or maybe even impossible to make some 3rd party classes
serializable.

Things to highlight here:
1. I did it only for StreamAPI i think it could work also for TableAPI
though.
2.I was loading a Spring context from ProcessFunction::open method.
I was able to customize via Job parameters which Spring configuration I
want to load.
After doing this, all fields annotated with @Autowired were injected.
3, I was using standard @Configuration classes

Issues:
1. Since i was using operator::open method to load the context, the context
will be loaded few times depends on the number of operators deployed on
particular Task Manager. This however could be improved.
2. The important thing here was that all your classes have to be "deployed"
on every Task Manager/Job Manager in order to load them through DI.
We achieved this by using what is called "Job session" cluster. Where our
custom Flink docker image was build in a way that it contains our job jar
with all dependencies needed.

Because of that, we were not be able to use things like AWS EMR or Kinesis.

Cheers,
Krzysztof Chmielewski

wt., 9 lis 2021 o 06:46 Thomas Weise  napisał(a):

> Hi,
>
> I was looking into a problem that requires a configurable type
> serializer for communication with a schema registry. The service
> endpoint can change, so I would not want to make it part of the
> serializer snapshot but rather resolve it at graph construction time
> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
> be embedded into a checkpoint).
>
> TypeSerializer is instantiated via either TypeInformation or
> TypeSerializerSnapshot. While TypeInformation provides access to
> ExecutionConfig and therefore ability to access parameters from
> GlobalJobParameters that could be provided through the entry point,
> restoreSerializer requires the serializer to be constructed from the
> snapshot state alone.
>
> Ideally there would be a dependency injection mechanism for user code.
> Discussion in [1] indicated there isn't a direct solution. Has anyone
> come across a similar use case and found a way to work around this
> limitation? It might be possible to work with a configuration
> singleton that initializes from a file in a well known location, but
> that depends on the deployment environment and doesn't play nice with
> testing.
>
> Thanks,
> Thomas
>
> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o
>


Could not start rest endpoint on any port in port range 8081报错,但是8081未占用啊

2021-11-09 Thread Geoff nie
大佬好!
flink ./start-cluster.sh 启动过程中如下错误,但是该端口号未占用啊,请大佬帮看看。谢谢


org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
 [flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:59)
 [flink-dist_2.11-1.13.3.jar:1.13.3]
Caused by: org.apache.flink.util.FlinkException: Could not create the 
DispatcherResourceManagerComponent.
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_171]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_171]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 ~[flink-shaded-hadoop-2-uber-2.7.5-9.0.jar:2.7.5-9.0]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
... 2 more
Caused by: java.net.BindException: Could not start rest endpoint on any port in 
port range 8081
at 
org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_171]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_171]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 ~[flink-shaded-hadoop-2-uber-2.7.5-9.0.jar:2.7.5-9.0]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
 ~[flink-dist_2.11-1.13.3.jar:1.13.3]
... 2 more









Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Jun Qin
Hi Dongwon

Did you override AsyncFunction#timeout()?  If so, did you call 
resultFuture.complete()/completeExceptionally() in your override?  Not calling 
them can result in checkpoint timeout.

Thanks
Jun


> On Nov 9, 2021, at 7:37 AM, Dongwon Kim  wrote:
> 
> Hi David,
> 
> There are currently no metrics for the async work-queue size (you should be 
> able to see the queue stats with debug logs enabled though [1]). 
> Thanks for the input but scraping DEBUG messages into, for example, 
> ElasticSearch for monitoring on Grafana is not possible in my current 
> environment.
> I just defined two counters in RichAsyncFunction for tracking # sent requests 
> and # finished/failed requests, respectively, and used the two counters to 
> calculate the inflight requests from Prometheus.
> 
> As far as I can tell from looking at the code, the async operator is able to 
> checkpoint even if the work-queue is exhausted.
> Oh, I didn't know that! As you pointed out and I'm going to explain below, 
> the async operator might not be the source of the problem.
> 
> I just hit the same situation and found that 
> - # of inflight records are zero when the backpressure is getting high
> - A taskmanager complains the following error message around the time when 
> the backpressure is getting high (all the others don't do):
> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread   
>  [] - Uncaught exception in thread 'kafka-producer-network-thread 
> | producer-8':
> java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:118) ~[?:?]
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?]
> at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?]
> at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493) 
> ~[?:?]
> at java.nio.channels.SocketChannel.write(SocketChannel.java:507) 
> ~[?:?]
> at 
> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152)
>  ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
>  ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at org.apache.kafka.common.network.Selector.poll(Selector.java:483) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at java.lang.Thread.run(Thread.java:829) [?:?]
> 
> Can it be the reason why my pipeline is stalled and ends up with the checkout 
> timeout? I guess all the upstream tasks might fail to send data to the failed 
> kafka producer and records are stacking up in buffers, which could result in 
> the back-pressure. If so, is there no mechanism in Flink to detect such an 
> error and send it to the job manager for debugging purposes?
> 
> Best,
> 
> Dongwon
> 
> 
> On Mon, Nov 8, 2021 at 9:21 PM David Morávek  > wrote:
> Hi Dongwon,
> 
> There are currently no metrics for the async work-queue size (you should be 
> able to see the queue stats with debug logs enabled though [1]). As far as I 
> can tell from looking at the code, the async operator is able to checkpoint 
> even if the work-queue is exhausted.
> 
> Arvid can you please validate the above? (the checkpoints not being blocked 
> by the work queue part)
> 
> [1] 
> https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109
>  
> 
> 
> Best,
> D.
> 
> On Sun, Nov 7, 2021 at 10:41 AM Dongwon Kim  > wrote:
> Hi community,
> 
> While using Flink's async i/o for interacting with an external system, I got 

求使用oracle jdk8的flink docker镜像Dockerfile

2021-11-09 Thread casel.chen
查了下flink官方docker image https://github.com/apache/flink-docker  
是基于openjdk的,体积虽然小,但少了很多工具,例如jstack,jps, jstat, jmap等。
当作业出现问题时这些工具可以派上用场。问一下要怎么换成oracle jdk8? 求一份 Dockerfile,谢谢!

Re:回复: Re: 提交flink作业抛 java.lang.LinkageError

2021-11-09 Thread casel.chen
试过设置成provided,作业运行会抛ClassNotFoundError: 
org/apache/kafka/clients/consumer/ConsumerRecord





在 2021-11-09 09:54:59,"WuKong"  写道:
>Hi :
>看报错日志,还是类加载问题 提示的报错信息 是说已经由不同类加载器已经加装了改依赖。如果生产环境上已经由了相关依赖包,建议将依赖设置为provided
>
>Caused by: java.lang.LinkageError: loader constraint violation: loader
>>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>>> initiated loading for a different type with name
>>> "org/apache/kafka/clients/consumer/ConsumerRecord"
>
>
>
>---
>Best,
>WuKong
> 
>发件人: casel.chen
>发送时间: 2021-11-08 14:38
>收件人: user-zh
>主题: Re:Re: 提交flink作业抛 java.lang.LinkageError
>版本是一致的,都是1.12.5版本
> 
> 
> 
> 
>在 2021-11-08 11:11:35,"Shuiqiang Chen"  写道:
>>Hi,
>>
>>能检查下作业jar里 kafka client的版本和平台上的是否一致吗?
>>
>>casel.chen  于2021年11月5日周五 下午11:25写道:
>>
>>> 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink
>>> sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka
>>> connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢!
>>>
>>>
>>> 2021-11-05 16:38:58 -  [submit-session-executor-6] ERROR
>>> c.h.s.launcher.AbstractJobExecutor - -start job failed-
>>>
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> caused an error:
>>>
>>>
>>>
>>>
>>>
>>>
>>> Classpath:
>>> [file:/opt/streamsql/jobs/aml-aml-aml/aml-datasync/TEST/aml-datasync-1.0-SNAPSHOT_zwb3274543418822102949.jar]
>>>
>>>
>>>
>>>
>>>
>>>
>>> System.out: (none)
>>>
>>>
>>>
>>>
>>>
>>>
>>> System.err: (none)
>>>
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
>>>
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172)
>>>
>>>
>>> at
>>> com.huifu.streamsql.launcher.AbstractJobExecutor.createJobGraph(AbstractJobExecutor.java:205)
>>>
>>>
>>> at
>>> com.huifu.streamsql.launcher.standalone.RemoteExecutor.doStart(RemoteExecutor.java:31)
>>>
>>>
>>> at
>>> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:51)
>>>
>>>
>>> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:15)
>>>
>>>
>>> at
>>> com.huifu.streamsql.service.StreamSqlServiceImpl.submitJob(StreamSqlServiceImpl.java:443)
>>>
>>>
>>> at
>>> com.huifu.kunpeng.service.DeploymentServiceImpl.submitJob(DeploymentServiceImpl.java:1662)
>>>
>>>
>>> at
>>> com.huifu.kunpeng.service.DeploymentServiceImpl.launchDeployment(DeploymentServiceImpl.java:1623)
>>>
>>>
>>> at
>>> com.huifu.kunpeng.service.DeploymentServiceImpl$$FastClassBySpringCGLIB$$855501cb.invoke()
>>>
>>>
>>> at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>>
>>>
>>> at
>>> org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:156)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
>>>
>>>
>>> at
>>> org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
>>>
>>>
>>> at
>>> com.huifu.kunpeng.service.DeploymentServiceImpl$$EnhancerBySpringCGLIB$$9aed5b42.launchDeployment()
>>>
>>>
>>> at
>>> com.huifu.kunpeng.runner.SubmitQueueApplicationRunner.lambda$run$0(SubmitQueueApplicationRunner.java:63)
>>>
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>
>>>
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>>> initiated loading for a different type with name
>>> "org/apache/kafka/clients/consumer/ConsumerRecord"
>>>
>>>
>>> at java.lang.ClassLoader.defineClass1(Native Method)
>>>
>>>
>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>>>
>>>
>>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>>
>>>
>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>>
>>>
>>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>>
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>>
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>>
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)


Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread David Morávek
This is definitely a bug on Kafka side, because they're not handling
uncaught exceptions properly [1]. I don't think there is much we can do on
the Flink side here, because we're not able to override factory for the
Kafka IO thread :/

[1] https://issues.apache.org/jira/browse/KAFKA-4228

On Tue, Nov 9, 2021 at 7:38 AM Dongwon Kim  wrote:

> Hi David,
>
> There are currently no metrics for the async work-queue size (you should
>> be able to see the queue stats with debug logs enabled though [1]).
>
> Thanks for the input but scraping DEBUG messages into, for example,
> ElasticSearch for monitoring on Grafana is not possible in my current
> environment.
> I just defined two counters in RichAsyncFunction for tracking # sent
> requests and # finished/failed requests, respectively, and used the two
> counters to calculate the inflight requests from Prometheus.
>
> As far as I can tell from looking at the code, the async operator is able
>> to checkpoint even if the work-queue is exhausted.
>
> Oh, I didn't know that! As you pointed out and I'm going to explain below,
> the async operator might not be the source of the problem.
>
> I just hit the same situation and found that
> - # of inflight records are zero when the backpressure is getting high
> - A taskmanager complains the following error message around the time when
> the backpressure is getting high (all the others don't do):
>
>> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread
>>   [] - Uncaught exception in thread
>> 'kafka-producer-network-thread | producer-8':
>>
>> java.lang.OutOfMemoryError: Direct buffer memory
>>
>> at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
>>
>> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
>> ~[?:?]
>>
>> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
>>
>> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?]
>>
>> at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?]
>>
>> at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?]
>>
>> at
>> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493) ~[?:?]
>>
>> at java.nio.channels.SocketChannel.write(SocketChannel.java:507)
>> ~[?:?]
>>
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at java.lang.Thread.run(Thread.java:829) [?:?]
>>
>
> Can it be the reason why my pipeline is stalled and ends up with the
> checkout timeout? I guess all the upstream tasks might fail to send data to
> the failed kafka producer and records are stacking up in buffers, which
> could result in the back-pressure. If so, is there no mechanism in Flink to
> detect such an error and send it to the job manager for debugging purposes?
>
> Best,
>
> Dongwon
>
>
> On Mon, Nov 8, 2021 at 9:21 PM David Morávek  wrote:
>
>> Hi Dongwon,
>>
>> There are currently no metrics for the async work-queue size (you should
>> be able to see the queue stats with debug logs enabled though [1]). As far
>> as I can tell from looking at the code, the async operator is able to
>> checkpoint even if the work-queue is exhausted.
>>
>> Arvid can you please validate the above? (the checkpoints not being
>> blocked by the work queue part)
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109
>>
>> Best,
>> D.
>>
>> On Sun, Nov 7, 2021 at 10:41 AM Dongwon Kim 
>> wrote:
>>
>>> Hi community,
>>>
>>> While using Flink's async i/o for interacting with an external system, I
>>> got the following exception:
>>>
>>> 2021-11-06 10:38:35,270 INFO  
>>>