?????? ??????????????????????flink state

2021-01-22 Thread ??????
TTL??keybykey1state?? ---- ??: ""https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows news_...@163.com

求一个简单的示例,flink写orc格式文件,对于Map复杂类型的写法。

2021-01-22 Thread 赵一旦
目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。 对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。 serverTimeColumnVector.vector[rowId] = ele.getTimestamp(); MapColumnVector dColumnVector = (MapColumnVector)

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-22 Thread 赵一旦
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。 如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。 MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2]; 赵一旦 于2021年1月23日周六 下午1:42写道: > 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。 > > 张锴

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-22 Thread 赵一旦
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。 张锴 于2021年1月21日周四 下午7:35写道: > @赵一旦 > 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 > > 张锴 于2021年1月21日周四 下午7:13写道: > > > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 > > > > 赵一旦 于2021年1月21日周四 下午7:05写道: > > > >> @Michael Ran; 嗯嗯,没关系。 > >> > >> @张锴

Re: Flink的StreamFileSink和1.12提供的FileSink中,BucketsBuilder的createBucketWriter中仅支持recoverableWriter。

2021-01-22 Thread 赵一旦
已解决。重改写了flink源码覆盖了这部分限制就可以了。 赵一旦 于2021年1月22日周五 上午10:17写道: > 如题,为什么仅支持recoverableWriter,如果我使用的文件系统不支持怎么办呢,必须自定义sink吗? > > > 我这边用的是一个公司自研的大型分布式文件系统,支持hadoop协议(但不清楚是否所有特性都支持),目前使用streamFileSink和FileSink貌似都无法正常写。 > > 不清楚是否有其他问题,至少当前是卡住在这个recoverable上了。 > > 报错是只有hdfs才支持recoverableWriter。 > >

?????? ????????????Orders????????????????join????????????????????

2021-01-22 Thread ??????
=_= ??thank you?? ---- ??: "yang nick"

Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。

2021-01-22 Thread yang nick
我感觉你这个用实时很难做,涉及到状态更新的无限流,需要配置 state ttl 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:23写道: > > 我遇到的难题是,拒收订单想拿到payAment字段必须扫描全量的order_money表。order_money是下单时候才会产生,我拒收订单根本不知道它的下单时间根本不知道怎么拿,而且order_money没有任何标记,我全量扫描money表程序OOM。我的数据是通过Canal监控过来的,我需要写flink-sql来进行join。 > > > > >

Re: flink-sql-gateway支持远程吗

2021-01-22 Thread yang nick
可以试试zeppelin 罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道: > > 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行 > > > | | > 15927482803 > | > | > 邮箱:15927482...@163.com > | > > 签名由 网易邮箱大师 定制

Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。

2021-01-22 Thread yang nick
我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:11写道: > > 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。 > > > > > --原始邮件-- >

?????? ????????????Orders????????????????join????????????????????

2021-01-22 Thread ??????
order_reject??order_rejectorder_money??order_money??join??order_money?? ---- ??:

Re: Kubernetes HA Services - artifact for KubernetesHaServicesFactory

2021-01-22 Thread Ashish Nigam
Yang, It worked finally... after adding kubernetes related config in configmap. On Thu, Jan 21, 2021 at 7:13 PM Yang Wang wrote: > You could set config option "kubernetes.namespace" to your flink-conf > ConfigMap. And then > KubernetesHAService will use it to create/watch the ConfigMap. Please

Re: What is the best way to have a cache of an external database in Flink?

2021-01-22 Thread David Anderson
I provided an answer on stackoverflow, where I said the following: A few different mechanisms in Flink may be relevant to this use case, depending on your detailed requirements. *Broadcast State* Jaya Ananthram has already covered the

Re: A few questions about minibatch

2021-01-22 Thread Rex Fenley
Hello, Does anyone have any more information here? Thanks! On Wed, Jan 20, 2021 at 9:13 PM Rex Fenley wrote: > Hi, > > Our job was experiencing high write amplification on aggregates so we > decided to give mini-batch a go. There's a few things I've noticed that are > different from our

AW: What is the best way to have a cache of an external database in Flink?

2021-01-22 Thread Jan Oelschlegel
But then you need a way to consume a database as a DataStream. I found this one https://github.com/ververica/flink-cdc-connectors. I want to implement a similar use case, but I don’t know how to parse the SourceRecord (which comes from the connector) into an PoJo for further processing. Best,

回复:请教关于Flink yarnship的使用

2021-01-22 Thread 叶贤勋
URL url = this.getClass().getClassLoader().getResource("conf”); String dir = url.getFile(); dir目录下应该会包含ship的配置文件,你可以试下。 在2021年01月22日 15:38,Yan Tang 写道: 我把配置和jar包分开了,用-yt option将配置文件Ship到yarn cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么? 我的提交命令: -yt /path/to/conf code:

DataStream API: Best way for reading csv file

2021-01-22 Thread Jan Oelschlegel
Hi , i'm looking for an comfortable way to read a CSV file with the DataStream API in Flink 1.11. Without using the Table/SQL-API before. This is my first approach: val typeInfo = TypeInformation.of(classOf[CovidEvent]).asInstanceOf[PojoTypeInfo[CovidEvent]] val csvInputFormat = new

Re: [Flink SQL] CompilerFactory cannot be cast error when executing statement

2021-01-22 Thread Sebastián Magrí
Thanks a lot Matthias! In the meantime I'm trying out something with the scala quickstart. On Fri, 22 Jan 2021 at 17:12, Matthias Pohl wrote: > Ok, to be fair, I just did some research on the error message and didn't > realize that you're working with binaries only. > > I tried to set it up

Re: [Flink SQL] CompilerFactory cannot be cast error when executing statement

2021-01-22 Thread Matthias Pohl
Ok, to be fair, I just did some research on the error message and didn't realize that you're working with binaries only. I tried to set it up on my machine to be able to reproduce your error. Unfortunately, I wasn't able to establish the connection between Flink and Postgres using your

Re: What is the best way to have a cache of an external database in Flink?

2021-01-22 Thread Selvaraj chennappan
Hi, Perhaps broadcast state is natural fit for this scenario. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html Thanks, Selvaraj C On Fri, 22 Jan 2021 at 8:45 PM, Kumar Bolar, Harshith wrote: > Hi all, > > The external database consists of a

Re: [Flink SQL] CompilerFactory cannot be cast error when executing statement

2021-01-22 Thread Sebastián Magrí
Hi Matthias! I went through that thread but as I'm just using the `apache/flink` docker image for testing I honestly couldn't figure out how I would do that since I don't have a pom file to edit. If it's possible to do it through the configuration I'd be glad if you could point me out in the

Re: [Flink SQL] CompilerFactory cannot be cast error when executing statement

2021-01-22 Thread Matthias Pohl
Hi Sebastián, have you tried changing the dependency scope to provided for flink-table-planner-blink as it is suggested in [1]? Best, Matthias [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-10-exception-Unable-to-instantiate-java-compiler-td38221.html On Fri, Jan 22,

What is the best way to have a cache of an external database in Flink?

2021-01-22 Thread Kumar Bolar, Harshith
Hi all, The external database consists of a set of rules for each key, these rules should be applied on each stream element in the Flink job. Because it is very expensive to make a DB call for each element and retrieve the rules, I want to fetch the rules from the database at initialization

[Flink SQL] CompilerFactory cannot be cast error when executing statement

2021-01-22 Thread Sebastián Magrí
Hi! I'm trying out Flink SQL with the attached docker-compose file. It starts up and then I create a table with the following statement: CREATE TABLE mytable_simple ( `customer_id` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://pgusr:pgpwd@postgres/pdgb', 'table-name' =

Re: Flink 1.11 checkpoint compatibility issue

2021-01-22 Thread Arvid Heise
Hi Lu, if you are using data stream API make sure to set manual uids for each operator. Only then migrating of savepoints to other major versions of Flink is supported. [1] Best, Arvid [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html On Fri, Jan 22, 2021 at

Re: Unknown call expression: avg(amount) when use distinct() in Flink,Thanks~!

2021-01-22 Thread Timo Walther
Hi, I'm assuming you are using Flink 1.12? The exception indicates that something is definitely going wrong with the translation from Table API to optimizer nodes. We refactored a lot of this code in this region. I investogate the issue and come back to you once I opended a ticket. Thanks

RE: org.apache.flink.runtime.client.JobSubmissionException: Job has already been submitted

2021-01-22 Thread Hailu, Andreas [Engineering]
Hi Robert, I appreciate you having a look. I’ll have a closer look and see what I can find. Thanks! // ah From: Robert Metzger Sent: Friday, January 22, 2021 2:41 AM To: Hailu, Andreas [Engineering] Cc: user@flink.apache.org Subject: Re:

Re: Flink 1.11 checkpoint compatibility issue

2021-01-22 Thread Matthias Pohl
Hi Lu, thanks for reaching out to the community, Lu. Interesting observation. There's no change between 1.9.1 and 1.11 that could explain this behavior as far as I can tell. Have you had a chance to debug the code? Can you provide the code so that we could look into it more closely? Another thing:

File not generated using StreamingFileSink path 1.12.0

2021-01-22 Thread Robert Cullen
I’m trying to stream data to a file on an S3 compatible system (MINIO): DataStream resultStream = tEnv.toAppendStream(log_counts, Types.ROW(Types.STRING, Types.STRING, Types.LONG)); final StreamingFileSink sink = StreamingFileSink.forRowFormat( new

Unknown call expression: avg(amount) when use distinct() in Flink,Thanks~!

2021-01-22 Thread Appleyuchi
I'm testing https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html the part "Distinct aggregation on over window"(Ctrl+f and search the whole above string in above link please) test code distinctaggregation3.java https://paste.ubuntu.com/p/7HJ9W3hVVN/ POJO needed

Re: Re: Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka

2021-01-22 Thread Matthias Pohl
Hi Smile, Have you used a clean checkout? I second Robert's statement considering that the dependency you're talking about is already part of flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml. It also has the correct scope set both in master and release-1.12. Best, Matthias On

Re: Handling validations/errors in the flink job

2021-01-22 Thread Matthias Pohl
Hi Sagar, have you had a look at CoProcessFunction [1]? CoProcessFunction enables you to join two streams into one and also provide context to use SideOutput [2]. Best, Matthias [1]

Re: JDBC connection pools

2021-01-22 Thread Matthias Pohl
Hi Marco, have you had a look into the connector documentation ([1] for the regular connector or [2] for the SQL connector)? Maybe, discussions about connection pooling in [3] and [4] or the code snippets provided in the JavaDoc of JdbcInputFormat [5] help as well. Best, Matthias [1]

[ANNOUNCE] 1.12.1 may still produce corrupted checkpoints

2021-01-22 Thread Arvid Heise
Dear users, Unfortunately, the bug in the unaligned checkpoint that we fixed in 1.12.1 still occurs under certain circumstances, such that we recommend to not use unaligned checkpoints in production until 1.12.2. While the normal processing is not affected by this bug, a recovery with corrupted

Re: Question about setNestedFileEnumeration()

2021-01-22 Thread Matthias Pohl
Hi Wayne, based on other mailing list discussion ([1]) you can assume that the combination of FileProcessingMode.PROCESS_CONTINUOUSLY and setting FileInputFormat.setNestedFileEnumeration to true should work as you expect it to work. Can you provide more context on your issue like log files? Which

Re: 请教关于Flink yarnship的使用

2021-01-22 Thread silence
你可以尝试同时指定-C "file:///path/to/conf/cmp_online.cfg" 以及 -yt /path/to/conf 来进行测试 然后代码里这么获取this.getClass().getResourceAsStream("cmp_online.cfg") -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教关于Flink yarnship的使用

2021-01-22 Thread Yan Tang
如果-yt 不适用我这种场景,真不知道这个option是做什么的了。在spark中我用的就是--files,可以达到我想要的效果。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 根据业务需求选择合适的flink state

2021-01-22 Thread 张锴
@赵一旦 可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题 赵一旦 于2021年1月22日周五 上午10:10写道: > 我理解你要的最终mysql结果表是: > 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间); > > 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。 > > >

Re:Re: Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka

2021-01-22 Thread Smile@LETTers
Yes, I've tried from both the root directory and the sub module. Neither or them works. And the error messages are the same. At 2021-01-21 23:22:12, "Robert Metzger" wrote: Since our CI system is able to build Flink, I believe it's a local issue. Are you sure that the build is failing when

退订

2021-01-22 Thread Natasha
退订

Re: 请教关于Flink yarnship的使用

2021-01-22 Thread yang nick
这个方法应该是读取本地的文件,但是你放到yarn中执行,就会找不到这个文件。所以建议可以把配置上传到hdfs中试试看 Yan Tang 于2021年1月22日周五 下午4:53写道: > 我把配置和jar包分开了,用-yt option将配置文件Ship到yarn > cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么? > 我的提交命令: > -yt /path/to/conf > > code: > this.getClass().getResourceAsStream("conf/cmp_online.cfg") > 但一直返回null. > > > >

Re: Flink 并行度问题

2021-01-22 Thread gimlee
并行度和CPU的核数没啥关系。 设置slot数量也不代表使用多少个CPU。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 并行度问题

2021-01-22 Thread yang nick
如果是 standalone的模式部署在一台机器上,那么据我了解,只会有一个TM,一个TM可以有多个slot Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道: > 使用Flink以来,一直有一个问题困扰着。 > > > Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 > > 比如Flink消费kafka > >

请教关于Flink yarnship的使用

2021-01-22 Thread Yan Tang
我把配置和jar包分开了,用-yt option将配置文件Ship到yarn cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么? 我的提交命令: -yt /path/to/conf code: this.getClass().getResourceAsStream("conf/cmp_online.cfg") 但一直返回null. -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Flink 并行度问题

2021-01-22 Thread Ye Chen
@jacob hi, TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,并且 Slot 只对内存隔离,没有对 CPU 隔离。 而slot 和并行度的关系是:Slot 是指 TaskManager 最大能并发执行的能力,parallelism 是指 TaskManager 实际使用的并发能力。 个人见解,并行度的设置一般无需考虑CPU。 在 2021-01-22 16:18:32,"Jacob" <17691150...@163.com> 写道: >使用Flink以来,一直有一个问题困扰着。 > > >Flink

Re: Flink 并行度问题

2021-01-22 Thread 赵一旦
不清楚你为啥需要想这些,集群的并行度你随意设置就好,考虑CPU核数等的地方都只是考虑理想情况的并发。 比如你CPU最高10个核,来20个线程也没办法“并行”,但是可以“并发”。如果你的线程事情很少,10个并发是无法占满10个CPU核的,所以没任何理由因为CPU核的数量去限制你的并发度。 Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道: > 使用Flink以来,一直有一个问题困扰着。 > > > Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 > > 比如Flink消费kafka > >

Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-22 Thread YueKun
关闭问题,已经解决,解决方法是不通过 pipeline.jars 的方式跟随python任务动态提交jar包,改为放在 FLINK_HOME/lib 下 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Exception in thread "main" java.lang.RuntimeException: Unknown call expression: avg(amount)

2021-01-22 Thread Appleyuchi
我在驗證 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html 中的 "Distinct aggregation on over window"(請在上述鏈接內,Ctrl+f搜索該雙引號內的整個字符串) 測試代碼: distinctaggregation3.java https://paste.ubuntu.com/p/7HJ9W3hVVN/ 測試用的POJO: OrderStream.java

Flink 并行度问题

2021-01-22 Thread Jacob
使用Flink以来,一直有一个问题困扰着。 Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 比如Flink消费kafka topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。 如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?

flink-Kafka 报错:ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

2021-01-22 Thread lp
测试代码如下: -- public class Sink_KafkaSink_1{ public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties")); String host =

Re: flink sql 执行limit 很少的语句依然会暴增

2021-01-22 Thread zhang hao
嗯嗯 好的 谢谢大家 ,应该就是这个问题了,merge到分支验证下 On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang wrote: > hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。 > > [1] https://github.com/apache/flink/pull/13800 > > Land 于2021年1月22日周五 上午11:28写道: > > > 可能是没有下推到MySQL执行。 > > 问题和我遇到的类似: > >