Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Shuiqiang Chen
Hi Robert, You can refer to https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py for the whole example. Best, Shuiqiang Robert Cullen 于2021年3月13日周六 上午4:01写道: > Shuiqiang, Can you include the import statements? thanks. > >

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
Same error. On Fri, 12 Mar 2021 at 09:01, ChesnaSchepler wrote: > From the exception I would conclude that your core-site.xml file is not > being picked up. > > AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so > try setting HADOOP_CONF_DIR to the directory that the

Re: [Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Magri, Sebastian
I validated it's still accepted by the connector but it's not in the documentation anymore. It doesn't seem to help in my case. Thanks, Sebastian From: Magri, Sebastian Sent: Friday, March 12, 2021 18:50 To: Timo Walther ; ro...@apache.org Cc: user Subject:

Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Robert Cullen
Shuiqiang, Can you include the import statements? thanks. On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen wrote: > Hi Robert, > > Kafka Connector is provided in Python DataStream API since release-1.12.0. > And the documentation for it is lacking, we will make it up soon. > > The following code

Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Shuiqiang Chen
Hi Robert, Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon. The following code shows how to apply KafkaConsumers and KafkaProducer: ``` env = StreamExecutionEnvironment.get_execution_environment()

Re: No saving data using rocksdb

2021-03-12 Thread Maminspapin
Roman, thank you for your attention. It looks like you are absolutely right. Thank you very much for helping. Before submitting a job I do next steps: 1. ./bin/start-cluster.sh 2. ./bin/taskmanager.sh start And in my code there is these line: env.setStateBackend(new

Re: [Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Magri, Sebastian
Hi Roman! Seems like that option is no longer available. Best Regards, Sebastian From: Roman Khachatryan Sent: Friday, March 12, 2021 16:59 To: Magri, Sebastian ; Timo Walther Cc: user Subject: Re: [Flink SQL] Leniency of JSON parsing Hi Sebastian, Did you

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
Yep, makes sense. On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan wrote: > > Want to confirm that the keys are GCed ( along with state ) once the > (windows close + lateness ) ? > Window state is cleared (as well as the window itself), but global > state is not (unless you use TTL). > > [1]

Re: [Schema Evolution] Cannot restore from savepoint after deleting field from POJO

2021-03-12 Thread Roman Khachatryan
Hi Alexis, This looks like a bug, I've created a Jira ticket to address it [1]. Please feel free to provide any additional information. In particular, whether you are able to reproduce it in any of the subsequent releases. [1] https://issues.apache.org/jira/browse/FLINK-21752 Regards, Roman

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread Chesnay Schepler
From the exception I would conclude that your core-site.xml file is not being picked up. AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so try setting HADOOP_CONF_DIR to the directory that the file resides in. On 3/12/2021 5:10 PM, sri hari kali charan Tummala wrote:

Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Robert Cullen
I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example? env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) ds = env.from_collection( KAFKA_SOURCE ) ... -- Robert Cullen 240-475-4490

Re: Evenly Spreading Out Source Tasks

2021-03-12 Thread Aeden Jameson
Hi Matthias, Yes, all the task managers have the same hardware/memory configuration. Aeden On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl wrote: > > Hi Aeden, > just to be sure: All task managers have the same hardware/memory > configuration, haven't they? I'm not 100% sure whether this

Handling Bounded Sources with KafkaSource

2021-03-12 Thread Rion Williams
Hi all, I've been using the KafkaSource API as opposed to the classic consumer and things have been going well. I configured my source such that it could be used in either a streaming or bounded mode, with the bounded approach specifically aimed at improving testing (unit/integration). I've

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
If anyone working have flink version 1.8.1 code reading S3 in Intellij in public GitHub please pass it on that will be huge help. Thanks Sri On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Which I already did in my pin still its not working. > >

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
Which I already did in my pin still its not working. Thanks Sri On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler wrote: > The concept of plugins does not exist in 1.8.1. As a result it should be > sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to > your project. > > On

Re: [Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Roman Khachatryan
Hi Sebastian, Did you try setting debezium-json-map-null-key-mode to DROP [1]? I'm also pulling in Timo who might know better. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html#debezium-json-map-null-key-mode Regards, Roman On Fri, Mar 12,

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Roman Khachatryan
> Want to confirm that the keys are GCed ( along with state ) once the > (windows close + lateness ) ? Window state is cleared (as well as the window itself), but global state is not (unless you use TTL). [1]

Re: DataStream in batch mode - handling (un)ordered bounded data

2021-03-12 Thread Dawid Wysakowicz
Hi Alexis, As of now there is no such feature in the DataStream API. The Batch mode in DataStream API is a new feature and we would be interested to hear about the use cases people want to use it for to identify potential areas to improve. What you are suggesting generally make sense so I think

DataStream in batch mode - handling (un)ordered bounded data

2021-03-12 Thread Alexis Sarda-Espinosa
Hello, Regarding the new BATCH mode of the data stream API, I see that the documentation states that some operators will process all data for a given key before moving on to the next one. However, I don't see how Flink is supposed to know whether the input will provide all data for a given key

Re: 关于statement输出结果疑问

2021-03-12 Thread Dian Fu
可以说一下为什么你觉得输出结果应该是1,2,1,2吗? 个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2 On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 wrote: > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > env_settings = >

Re: Pyflink 提交到本地集群报错

2021-03-12 Thread Dian Fu
从报错看,似乎是作业运行的时候,找不到pyflink,如果确实是这样的话,有几个解决方案: - 通过API指定集群端的Python路径: set_python_executable,参考 [1] - 通过配置python.executable,参考[2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html [2]

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread Chesnay Schepler
The concept of plugins does not exist in 1.8.1. As a result it should be sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to your project. On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote: Let's close this issue guys please answer my questions. I am using Flink

Re: Pyflink如何对接HBase?

2021-03-12 Thread Dian Fu
1)在PyFlink Table API中可以使用所有SQL中支持的connector,所以HBase connector也自然支持,具体使用方式可以看一下文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#how-to-use-connectors 2)HBase connector的使用方式可以看一下:

Re: PyFlink UDTF 运行一段时间后报 NullPointerException

2021-03-12 Thread Dian Fu
这个问题应该和这个JIRA有关系:https://issues.apache.org/jira/browse/FLINK-21434 目前已经在master和1.12.3上修复了,但是1.12.3还没有release。你要不cherry-pick这个fix,自己build一个版本,验证一下? On Tue, Mar 9, 2021 at 5:27 PM 梁丁文 wrote: > 测试PyFlink UDTF,运行数秒后异常退出。下面是UDTF函数 > > > class Mac(TableFunction): > def eval(self, body_data): >

Re: 关于pyflink LATERAL TABLE 问题请教

2021-03-12 Thread Dian Fu
用的PyFlink版本是多少?另外,如果方便的话,可以提供一个比较容易复现的例子吗? On Fri, Mar 12, 2021 at 4:57 PM 陈康 <844256...@qq.com> wrote: > 定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 > 毫无头绪、有大佬遇到过吗?谢谢! > > class myKerasMLP(ScalarFunction): > > def eval(self, *args): > ... > #

Re: Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-12 Thread Sush Bankapura
Hi Roman and Till, Thank you very much for your responses. With regards on the workload variation across the jobs, let me put it like this 1,. We have some jobs which are CPU intensive (and only operator state being persisted) and there are other jobs which are not so CPU intensive, but have

[Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Magri, Sebastian
I'm trying to extract data from a Debezium CDC source, in which one of the backing tables has an open schema nested JSON field like this: "objectives": { "items": [ { "id": 1, "label": "test 1" "size": 1000.0 }, { "id":

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi wrote: > I intend to augment every

Re: No saving data using rocksdb

2021-03-12 Thread Maminspapin
Hey, Roman I use every time the same key. And I get the correct value in StateManager every time the processElement() method executes. But then I stop the job and submit it again. And first execution processElement() get me null in state store. The key wasn't change. So, I'am in confuse

Re: Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-12 Thread Till Rohrmann
Hi Sushruth, if your jobs need significantly different configurations, then I would suggest to think about dedicated clusters per job. That way you can configure the cluster to work best for the respective job. Of course, running multiple clusters instead of a single one comes at the cost of more

Re: Filtering lines in parquet

2021-03-12 Thread Avi Levi
Cool, thanks! On Fri, Mar 12, 2021, 13:15 Arvid Heise wrote: > Hi Avi, > > thanks for clarifying. > > It seems like it's not possible to parse Parquet in Flink without knowing > the schema. What i'd do is to parse the metadata while setting up the job > and then pass it to the input format: > >

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
I intend to augment every event in a session with a unique ID. To keep the session lean, there is a PurgingTrigger on this aggregate that fires on a count of 1. >> (except that the number of keys can grow). Want to confirm that the keys are GCed ( along with state ) once the (windows close +

Re: How to debug checkpoint/savepoint stuck in Flink 1.12.2

2021-03-12 Thread Arvid Heise
Yes, please send me the full stack trace. You could also send it to me personally if you don't want to share it on the ML. I'm especially interested in the legacy source thread that holds the lock 0x00058e8c5070 if you only want to share an excerpt. On Fri, Mar 12, 2021 at 2:29 AM ChangZhuo

Re: Evenly Spreading Out Source Tasks

2021-03-12 Thread Matthias Pohl
Hi Aeden, just to be sure: All task managers have the same hardware/memory configuration, haven't they? I'm not 100% sure whether this affects the slot selection in the end, but it looks like this parameter has also an influence on the slot matching strategy preferring slots with less utilization

Re: Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-12 Thread Roman Khachatryan
Hi, Do I understand correctly that: 1. The workload varies across the jobs but stays the same for the same job 2. With a small number of slots per TM you are concerned about uneven resource utilization when running low- and high-intensive jobs on the same cluster simultaneously? If so, wouldn't

Re: How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-12 Thread Arvid Heise
Hi Lei, yes, metaspace would run out eventually if you run too much in parallel. All finished jobs will close the classloaders and free the metaspace memory. For newer setups, we recommend creating an ad-hoc cluster for each Flink application for this and several other reasons. If you are

Re: Filtering lines in parquet

2021-03-12 Thread Arvid Heise
Hi Avi, thanks for clarifying. It seems like it's not possible to parse Parquet in Flink without knowing the schema. What i'd do is to parse the metadata while setting up the job and then pass it to the input format: ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, path,

Re: User metrics outside tasks

2021-03-12 Thread Arvid Heise
Hi Bob and Alexey, I double-checked and there is currently no way to achieve what you want. The good news is that the OOM part should be addressed through FLINK-20833 [1], maybe it's even suitable for other issues. A "workaround" (I don't think it's a workaround) for your issues would actually

Re: flinksql 1.12.1 row中字段访问报错

2021-03-12 Thread Peihui He
如果单独执行这个function 的话是没有问题的 select Test().a 是没有问题的 Peihui He 于2021年3月12日周五 下午6:30写道: > hi, all > > 定义一个 ScalarFunction > class Test extends ScalarFunction{ > @DataTypeHint("ROW") > def eval(): Row ={ > Row.of("a", "b", "c") > } > } > > 当执行下面语句的时候 > select Test().a from taba1 >

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Roman Khachatryan
Hi Vishal, There is no leak in the code you provided (except that the number of keys can grow). But as you figured out the state is scoped to key, not to window+key. Could you explain what you are trying to achieve and why do you need to combine sliding windows with state scoped to window+key?

flinksql 1.12.1 row中字段访问报错

2021-03-12 Thread Peihui He
hi, all 定义一个 ScalarFunction class Test extends ScalarFunction{ @DataTypeHint("ROW") def eval(): Row ={ Row.of("a", "b", "c") } } 当执行下面语句的时候 select Test().a from taba1 会报下面的错误: java.io.IOException: Fail to run stream sql job at

Re: No saving data using rocksdb

2021-03-12 Thread Roman Khachatryan
Are you starting the job from savepoint [1] when submitting it again? If not, it is considered as a new job and will not pick up the old state. [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#starting-a-job-from-a-savepoint Regards, Roman On Fri, Mar 12, 2021 at

No saving data using rocksdb

2021-03-12 Thread Maminspapin
I have following piece of configuration in flink.yaml: Key Value high-availability zookeeper high-availability.storageDir file:///home/flink/flink-ha-data

Re: Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread Dan Hill
Thanks David! On Fri, Mar 12, 2021, 01:54 David Anderson wrote: > WatermarkStrategy.withIdleness works by marking idle streams as idle, so > that downstream operators will ignore those streams and allow the > watermarks to progress based only on the advancement of the watermarks of > the still

Re: Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread David Anderson
WatermarkStrategy.withIdleness works by marking idle streams as idle, so that downstream operators will ignore those streams and allow the watermarks to progress based only on the advancement of the watermarks of the still active streams. As you suspected, this mechanism does not provide for the

关于pyflink LATERAL TABLE 问题请教

2021-03-12 Thread 陈康
定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7 毫无头绪、有大佬遇到过吗?谢谢! class myKerasMLP(ScalarFunction): def eval(self, *args): ... # 返回预测结果 return str(trueY[0][0]) + '|' + str(trueY[0][1]) 注册UDF函数 myKerasMLP = udf(myKerasMLP(),

Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread Dan Hill
I haven't been able to get WatermarkStrategy.withIdleness to work. Is it broken? None of my timers trigger when I'd expect idleness to take over. On Tue, Mar 2, 2021 at 11:15 PM Dan Hill wrote: > Hi. > > For local and tests development, I want to flush the events in my system > to make sure

Re: Gradually increasing checkpoint size

2021-03-12 Thread Dan Hill
I figured it out. I have some records with the same key and I was doing an IntervalJoin. One of the IntervalJoin implementations that I found looks like it the runtime increases exponentially when there are duplicate keys. I introduced a de-duping step and it works a lot faster. On Thu, Mar 11,

Re: No saving data using rocksdb

2021-03-12 Thread Roman Khachatryan
Hi Yuri, The state that you access with getRuntimeContext().getState(...) is scoped to the key (so for every new key this state will be null). What key do you use? Regards, Roman On Fri, Mar 12, 2021 at 7:22 AM Maminspapin wrote: > > I have following piece of configuration in flink.yaml: > >

Re: Flink savepoint迁移问题

2021-03-12 Thread 赵 建云
确认了,pulsar的MessageId的实现类内部的增加了字段,导致flink在反序列化时失败了。具体的issue:https://github.com/streamnative/pulsar-flink/issues/256。 我会给flink 1.9的pulsar连接器升级下checkpoint,让MessageId的序列化使用基于 `MessageId.toByteArray`的序列化器。 非常感谢您的帮助~。 Jianyun8023 2021-03-12 2021年3月11日 下午10:43,Kezhu Wang mailto:kez...@gmail.com>> 写道: