SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
Hi Lorenzo,
Read from a previouse thread [1] and the source code, I think you may set
inputFormat.setNestedFileEnumeration(true) to also scan the nested files.
Best,
Yun
[1]
挺好的!1.记录进出flink的时间,如果时间生成规则一致,那么flink、spark
消耗至少是一致的,具有可对比性吧。性能影响会有的,不大。2.是不是可以尝试
数据字节大小,算子,窗口,资源,场景(简单ETL,维表JOIN,双流join,触发背压,复杂SQL 计算) SQL 有标准的测试
在 2020-06-12 10:49:26,"Zhonghan Tang" <13122260...@163.com> 写道:
>Hi,
>近期我需要做一个 Flink/SparkStreaming 吞吐/延时的性能分析, 我发现网上这方面资料很少, 只有17年美团/15年yahoo
Hi Vijay,
The memory configurations in Flink 1.9 and previous versions are indeed
complicated and confusing. That is why we made significant changes to it in
Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the
upcoming Flink 1.11 which is very likely to be released in this
是的,我代码贴错了, 你参考下下面的链接
> 在 2020年6月12日,11:55,Zhou Zach 写道:
>
> 感谢回复,不过,根据文档
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html
> 只能用Blink planner吧
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-12 11:49:08,"Leonard Xu" 写道:
>> Hi,
>>
感谢回复,不过,根据文档
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html
只能用Blink planner吧
在 2020-06-12 11:49:08,"Leonard Xu" 写道:
>Hi,
>这个文档应该是1.9时更新漏掉了,我建个issue修复下。现在可以这样使用[1]:
>StreamExecutionEnvironment env =
Hi,
这个文档应该是1.9时更新漏掉了,我建个issue修复下。现在可以这样使用[1]:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment
可以试试用户代理吧,不用把所有keytab都加上
---原始邮件---
发件人: "zhangjunjie1...@163.com"
flink version 1.10.0
根据文档
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html#defining-temporal-table
想要Defining Temporal Table,但是没有发现getTableEnvironment。。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv =
Flink1.9环境下,搭建Flink on yarn平台,用户之间实现租户/资源隔离,在kerberos环境下运行Flink
perjob模式,需要在Flink-conf.yaml中添加:
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/flink/p_zhangjunjie.keytab
security.kerberos.login.principal: p_zhangjun...@local.com
Hi Clay,
Could you verify the "taskmanager.sh" used is the same script shipped with
Flink-1.10.1? Or a custom script is used? Also, does the jar file
"bash-java-utils.jar" exist in your Flink bin directory?
In Flink 1.10, the memory configuration for a TaskManager works as follows.
-
我们最近做了一个基于beam nexmark的性能对比测试[1],你可以参考一下。
和beam的测试不同的是,我们用各自引擎的API对着测试case描述的场景重新写了一下,并不是像这个里面一样全都用
beam的api写测试case,然后翻译到多个runner之上。
[1] https://beam.apache.org/documentation/sdks/java/testing/nexmark/
Best,
Kurt
On Fri, Jun 12, 2020 at 10:49 AM Zhonghan Tang <13122260...@163.com> wrote:
>
hi,这个功能目前已经在 Flink 中实现了,参考 [1],1.11.0 开始支持
[1]. https://issues.apache.org/jira/browse/FLINK-15220;
---
Best,
Matt Wang
On 06/12/2020 10:37,Kyle Zhang wrote:
Hi,
kafka connector
Hi,
近期我需要做一个 Flink/SparkStreaming 吞吐/延时的性能分析, 我发现网上这方面资料很少, 只有17年美团/15年yahoo
做了一个类似的分析. 问题如下:
1. 简单的读kafka写kafka 要如何记录数据进flink/出flink 的时间? 如果是打时间戳要怎么打? 打的话会不会影响性能?
2. 我想到的场景是: 简单插数etl, 基本的过滤, 窗口. 请问还有没有什么定性定量的方式可以测量框架性能?
美团链接:
目前版本不支持,我看1.11版本支持,其实可以自己修改支持
---原始邮件---
发件人: "Kyle Zhang"
Hi,
kafka connector
ddl能不能像flinkKafkaConsumer.setStartFromTimestamp(xx)一样从指定timestamp开始消费,我看文档里只提到了earliest-offset,latest-offset,group-offsets,specific-offsets
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11', -- required:
> 如何区分代码是运行在Client/JobManager/TaskManager里面?
Yarn perjob模式,用户的main方法运行在Client端,编译生成JobGraph会ship到JobManager进行
任务的调度。我理解用户的代码一般是不会在JobManager端运行的,JM会将task调度到TaskManager
上运行。对于Yarn application模式,用户main运行在JobManager生成JobGraph,其他相同。
> jarB中mainClass是否已经运行在yarn上了?还是运行在服务器端?
Hi ZheFu,
可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
的数据是否都已经 Sink 到了 kafka.
也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。
具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。
Best,
LakeShen
Congxian Qiu
Hey all,
Adding to Aaron's response, we use Bazel to build our Flink apps. We've
open-sourced some of our setup here[1] though a bit outdated. There are
definitely rough edges/ probably needs a good deal of work to fit other
setups. We have written a wrapper around the `java_library` and
Hi Dan,
We use Bazel to compile our Flink applications. We're using "rules_scala" (
https://github.com/bazelbuild/rules_scala) to manage the dependencies and
produce jars. We haven't had any issues. However, I have found that
sometimes it's difficult to figure out exactly what Flink target or
I took the Flink playground and I'm trying to swap out Maven for Bazel. I
got to the point where I'm hitting the following error. I want to diff my
code with an existing, working setup.
Thanks! - Dan
client_1|
org.apache.flink.client.program.ProgramInvocationException:
Neither
Hi John,
I think that using different plugins is not going to be an issue,
assuming that the scheme of your FS's do not collide. This is already
the case for S3 within Flink, where we have 2 implementations, one
based on Presto and one based on Hadoop. For the first you can use the
scheme s3p
Hi flink fans,
I'm hoping for an easy solution. I'm trying to upgrade my 9.3 cluster to
flink 10.1, but i'm running into memory configuration errors.
Such as:
*Caused by: org.apache.flink.configuration.IllegalConfigurationException:
The network memory min (64 mb) and max (1 gb) mismatch, the
Hello,
I upgraded Flink from 1.7 to 1.10 in Kubernetes. When the job manager is
launched, the following exception occurs. If I do some cleanup in
zookeeper and re-start, it will work. Any ideas about this error and what
needs to be done without having to do cleanup in zookeeper?
ERROR
So I think that will work, but it has some limitations. Namely, when
launching clusters through a service (which is our use case), it can be the
case that multiple different clients want clusters with different plugins
or different versions of a given plugin, but because the
FlinkClusterDescriptor
I'm not the original poster, but I'm running into this same issue. What you
just described is exactly what I want. I presume you guys are using some
variant of this helm
https://github.com/docker-flink/examples/tree/master/helm/flink to configure
your k8s cluster? I'm also assuming that this
Hi,
> 您是说将那几个jar都放到flink/lib下吗?
你看这个报错,是在flink的client提交作业的时候就抛出的异常,看起来是你client所在机器配置缺少了hbase的jar。
你在使用hbase集群,肯定线上是有hadoop集群的,hbase 的一些类是依赖了 hadoop 的依赖,所以你把 hbase lib下 jar 加到
$HADOOP_CLASSPATH下,这样$HADOOP_CLASSPATH 下就有hbase所需的所有jar了,在flink的lib目录下,只需要添加
flink-hbase_2.11
Hi,
related to the same case I am discussing in another thread, but not related
to AVRO this time :)
I need to ingest files a S3 Sink Kafka Connector periodically adds to an S3
bucket.
Files are bucketed by date time as it often happens.
Is there any way, using Flink only, to monitor a
Hi Arvit,
I followed your instructions for the breakpoint in
SpecificDatumReader.readField *with AVRO 1.8.2*,
For all timestamp-millis fields (I have many):
Conversion conversion = ((SpecificRecordBase) r).getConversion(f.pos());
returns null for all timestamp-millis fields (I have many),
Interestingly, it appears to have been related to the stream application design
that was causing incremental checkpointing issues. Once the checkpoints started
failing, they would cause a positive feedback loop of failure as more and more
data built up to write, and other exceptions would pop
Hi,
I'm new to Flink - but after reading the documentation -
What would be the best approach to stream data from a restored state
following a job restart ?
Say I have a MapState that gets populated during streaming with various
computed results within a stateful operator.
Upon job restart or on
Hi Flink社区,
目前我们在调研checkpoint
跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?
Best,
Sorry forget my last mail, that was half-finished.
Here is the real one:
Hi Lorenzo,
if you still have time to investigate.
Your stack trace shows that all expected code paths have been taken.
Conversions are there; although they look different than here, but that can
be attributed to the avro
Hi Lorenzo,
if you still have time to investigate.
Your stack trace shows that all expected code paths have been taken.
Conversions are there although they look different than here, but that can
be attributed to the avro upgrade.
@Override
protected void readField(Object r, Schema.Field f,
There are no immediate plans, mostly because timers are fairly expensive
and represent an easy trap to trashing performance of invalidating
benchmark results.
On 11/06/2020 13:11, Vinay Patil wrote:
Ohh Okay, basically implement the Gauge and add timer functionality to
it for now.
Is there
Thanks Gouwei,
setting format.setReuseAvroValue(false) with 1.8.2-generated records does
not solve the problem.
12:02:59,314 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
java.lang.ClassCastException:
Ohh Okay, basically implement the Gauge and add timer functionality to it
for now.
Is there a plan or JIRA ticket to add Timer metric in future release, I
think it is good to have
Regards,
Vinay Patil
On Wed, Jun 10, 2020 at 5:55 PM Chesnay Schepler wrote:
> You cannot add custom metric
>checkpoint的配置有什么要求吗?
配成 EXACTLY_ONCE
>还有就是kafka的事务提交多久能提交一次,可配置吗?
chk 的时候提交,这里面深究的话有点并发问题,可以看 TwoPhaseCommitSink 的细节
配置这个事儿...有能力自定义,但是为啥要这么做呢呢
Best,
tison.
蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年6月11日周四 下午4:59写道:
> checkpoint完成通知里提交的事务
>
>
>
>
>
1. #
2. # A fatal error has been detected by the Java Runtime Environment:
3. #
4. # SIGSEGV (0xb) at pc=0x7fc2dd3d045e, pid=409,
tid=0x7fc2d46f4700
5. #
6. # JRE version: OpenJDK Runtime Environment (8.0_222-b10) (build
1.8.0_222-b10)
7. # Java VM: OpenJDK
Hello,
I'm writing a test for my custom sink function. The function is stateful
and relies on checkpoint restores for maintaining consistency with the
external system that it's writing to. For integration testing of the sink
function, I have a MiniCluster based environment inside a single JVM
Hi,
for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false);
Best,
Guowei
Lorenzo Nicora 于2020年6月11日周四 下午5:02写道:
> Hi Arvid,
>
> thanks for the point about catching records. Gotcha!
>
> Sorry I cannot share the full schema or generated code. It's a 3rd party
> IP and we signed
Hi Arvid,
thanks for the point about catching records. Gotcha!
Sorry I cannot share the full schema or generated code. It's a 3rd party IP
and we signed a meter-think NDA... I think I can post snippets.
The schema is heavily nested, including arrays of other record types
Types are primitives, or
checkpoint
----
??: "??"
Hi
exctly-once
checkpoint
kafka
??
----
??:"Matt Wang"
Hi
broadcast??
Best,
Yichao Yang
----
??:"xue...@outlook.com"https://go.microsoft.com/fwlink/?LinkId=550986;
joinmysql?? group by
?? A(id,ip)
mysql??B(startip,endip,area_id)
??A.ip between(B.startip,B.endIp) ??area_id
??area_id
sql
Hi,
I write a test for case 1 but it does not throw any exception. I use
the org.apache.flink.formats.avro.generated.JodaTimeRecord for the test.
Could you check whether AccountEntries.class has following code:
private static final org.apache.avro.Conversion[] conversions =
new
Hi,
I write a test for the case 1 but it does not throw any exception. I use
the org.apache.flink.formats.avro.generated.JodaTimeRecord for the test.
Best,
Guowei
Arvid Heise 于2020年6月11日周四 下午3:58写道:
> Hi Lorenzo,
>
> I'm glad that it worked out somehow, but I'd still like to understand what
>
Hi Lorenzo,
I'm glad that it worked out somehow, but I'd still like to understand what
went wrong, so it will work more smoothly for future users. I double
checked and we even test AvroSerializer with logical types, so I'm a bit
puzzled.
Could you attach GlHeader or at least show us how
环境背景:flink 1.10 standalone cluster
原因:因数据处理,需要动态的增加OutputTag做数据流的分流
即因业务原因,需要有些A数据先运行计算好后,才能被使用B、C、D类数据流结合处理后的结果后的结果流,才能使用A数据流。但A是变化的。
目的:除了Rich方式在open中能够预加载数据外(即先于数据流处理其他数据),是否还有其他方式
在flink的流式计算中没有什么比较好的办法,实施流之间的数据结果依赖。
我试过使用流式数据处理A;使用流式数据处理{B、C、D}在Function_XXX中使用A的结果流
yarn??java -cp??
zjfpla...@hotmail.com
Yichao Yang
?? 2020-06-11 15:53
user-zh
?? ??flink on yarn??
Hi
yarnyarnyarn??
Best,
Yichao Yang
Hi
yarnyarnyarn??
Best,
Yichao Yang
----
??:"zjfpla...@hotmail.com"
Hi,
The stack trace indicates that your query schema does not match with your
sink schema. It seems that `active_ratio*25 score` in your query is a
double value, not a `ROW` you declared in your sink.
op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道:
> hi
> flink1.10,wen i want to sink data to
hi,
you should make sure the types of the selected fields and the types of sink
table are the same,
otherwise you will get the above exception. you can change `active_ratio*25
score` to row type, just like:
insert into circle_weight select rowkey, ROW(info) from (
select
hi
flink1.10??wen i want to sink data to hbase table like this??
bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
rowkey String,
info ROW
Hi Arvid,
answering to your other questions
Here is the stacktrace of the case (1), when I try to read using specific
records generated by the AVRO 1.8.2 plugin
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
at
您是说将那几个jar都放到flink/lib下吗?
在2020年06月11日 14:39,Leonard Xu 写道:
Hi
你服务器上是否配置了hadoop_classpath? 建议hbase在试用时 用 hadoop_classpath + flink-hbase
jar,不然依赖问题会比较麻烦。
祝好
Leonard Xu
在 2020年6月11日,14:24,酷酷的浑蛋 写道:
在使用flink sql ddl语句向hbase中写的时候报如下错误:
java.lang.NoClassDefFoundError:
Hi Arvid,
Thanks for a quick reply.
The second reference link (
http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED=2) from
your answer is not accessible though. Could you share some more numbers
from it? Are these benchmarks published somewhere?
Without actual IO call, Async IO
??Blinkplanner??oldplanner??1.10
package test.table.sql
import java.util.Properties
import com.souhu.msns.huyou.PublicParams
import com.souhu.msns.huyou.utils.KafkaPbSchema
import org.apache.flink.api.common.time.Time
import
Hi,
可以稳定复现吗?麻烦贴下flink版本和你的case, 我可以帮忙跟进确认下
Best,
Leonard Xu
> 在 2020年6月11日,14:30,op <520075...@qq.com> 写道:
>
> 大家好,最近发现一个问题
> 发现同一套代码,用oldPlanner设置IdleStateRetentionTime后join两边表的状态可以被定时清理,但是用blinkplanner同样的代码,运行却发现状态不会被清理,这是个bug吗?
Hi
你服务器上是否配置了hadoop_classpath? 建议hbase在试用时 用 hadoop_classpath + flink-hbase
jar,不然依赖问题会比较麻烦。
祝好
Leonard Xu
> 在 2020年6月11日,14:24,酷酷的浑蛋 写道:
>
>
>
> 在使用flink sql ddl语句向hbase中写的时候报如下错误:
> java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
>at
>
??oldPlannerIdleStateRetentionTime??join??blinkplannerbug
Hi,
我在使用flink的过程中,有些疑问请教下各位:
1.flink分为jobmanger和taskmanager,我怎么区分哪些代码是运行在jobmanager,哪些在taskmanager?
2.假设我jarA中使用AbstractYarnClusterDescriptor.deployJobCluster()替代flink
run命令(想直接通过jar包启动方式直接提交flink任务上yarn),部署jarB到yarn上,jarB中mainClass中使用StreamExecutionEnvironment.execute去执行流任务,通过java
Hi Igal, thanks for your help.
If I understood correctly, the flink deployments (not the functions) needs
to use the same image right? Which means that the flink master and all
workers still needs to use the same image which includes the module.yaml and
the jar with embedded modules of the full
3ku
在 2020-06-11 14:10:53,"Leonard Xu" 写道:
>Hi,
>
>JDBC connector 之前不支持 unsigned 类型,unsigned 会比signed 类型更长。
>bigint(20) unsigned(range is 0 to 18446744073709551615) 超过了 bigint (range is
>-9223372036854775808 to 9223372036854775807)的长度。
>
>
在使用flink sql ddl语句向hbase中写的时候报如下错误:
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
at
org.apache.flink.addons.hbase.HBaseUpsertTableSink.consumeDataStream(HBaseUpsertTableSink.java:87)
at
Hi,
JDBC connector 之前不支持 unsigned 类型,unsigned 会比signed 类型更长。
bigint(20) unsigned(range is 0 to 18446744073709551615) 超过了 bigint (range is
-9223372036854775808 to 9223372036854775807)的长度。
最新的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下最新的代码,flink 中对应表 声明decimal(20, 0)处理。
祝好,
Leonard Xu
[1]
67 matches
Mail list logo