Hi Robert,
You can refer to
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
for the whole example.
Best,
Shuiqiang
Robert Cullen 于2021年3月13日周六 上午4:01写道:
> Shuiqiang, Can you include the import statements? thanks.
>
>
Same error.
On Fri, 12 Mar 2021 at 09:01, ChesnaSchepler wrote:
> From the exception I would conclude that your core-site.xml file is not
> being picked up.
>
> AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so
> try setting HADOOP_CONF_DIR to the directory that the
I validated it's still accepted by the connector but it's not in the
documentation anymore.
It doesn't seem to help in my case.
Thanks,
Sebastian
From: Magri, Sebastian
Sent: Friday, March 12, 2021 18:50
To: Timo Walther ; ro...@apache.org
Cc: user
Subject:
Shuiqiang, Can you include the import statements? thanks.
On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen wrote:
> Hi Robert,
>
> Kafka Connector is provided in Python DataStream API since release-1.12.0.
> And the documentation for it is lacking, we will make it up soon.
>
> The following code
Hi Robert,
Kafka Connector is provided in Python DataStream API since release-1.12.0.
And the documentation for it is lacking, we will make it up soon.
The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
Roman, thank you for your attention.
It looks like you are absolutely right. Thank you very much for helping.
Before submitting a job I do next steps:
1. ./bin/start-cluster.sh
2. ./bin/taskmanager.sh start
And in my code there is these line:
env.setStateBackend(new
Hi Roman!
Seems like that option is no longer available.
Best Regards,
Sebastian
From: Roman Khachatryan
Sent: Friday, March 12, 2021 16:59
To: Magri, Sebastian ; Timo Walther
Cc: user
Subject: Re: [Flink SQL] Leniency of JSON parsing
Hi Sebastian,
Did you
Yep, makes sense.
On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan wrote:
> > Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
> Window state is cleared (as well as the window itself), but global
> state is not (unless you use TTL).
>
> [1]
Hi Alexis,
This looks like a bug, I've created a Jira ticket to address it [1].
Please feel free to provide any additional information.
In particular, whether you are able to reproduce it in any of the
subsequent releases.
[1]
https://issues.apache.org/jira/browse/FLINK-21752
Regards,
Roman
From the exception I would conclude that your core-site.xml file is not
being picked up.
AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so
try setting HADOOP_CONF_DIR to the directory that the file resides in.
On 3/12/2021 5:10 PM, sri hari kali charan Tummala wrote:
I’ve scoured the web looking for an example of using a Kafka source for a
DataStream in python. Can someone finish this example?
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490
Hi Matthias,
Yes, all the task managers have the same hardware/memory configuration.
Aeden
On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl wrote:
>
> Hi Aeden,
> just to be sure: All task managers have the same hardware/memory
> configuration, haven't they? I'm not 100% sure whether this
Hi all,
I've been using the KafkaSource API as opposed to the classic consumer and
things have been going well. I configured my source such that it could be
used in either a streaming or bounded mode, with the bounded approach
specifically aimed at improving testing (unit/integration).
I've
If anyone working have flink version 1.8.1 code reading S3 in Intellij in
public GitHub please pass it on that will be huge help.
Thanks
Sri
On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:
> Which I already did in my pin still its not working.
>
>
Which I already did in my pin still its not working.
Thanks
Sri
On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler wrote:
> The concept of plugins does not exist in 1.8.1. As a result it should be
> sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to
> your project.
>
> On
Hi Sebastian,
Did you try setting debezium-json-map-null-key-mode to DROP [1]?
I'm also pulling in Timo who might know better.
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html#debezium-json-map-null-key-mode
Regards,
Roman
On Fri, Mar 12,
> Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
Window state is cleared (as well as the window itself), but global
state is not (unless you use TTL).
[1]
Hi Alexis,
As of now there is no such feature in the DataStream API. The Batch mode
in DataStream API is a new feature and we would be interested to hear
about the use cases people want to use it for to identify potential
areas to improve. What you are suggesting generally make sense so I
think
Hello,
Regarding the new BATCH mode of the data stream API, I see that the
documentation states that some operators will process all data for a given key
before moving on to the next one. However, I don't see how Flink is supposed to
know whether the input will provide all data for a given key
可以说一下为什么你觉得输出结果应该是1,2,1,2吗?
个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2
On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿 wrote:
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
>
> env_settings =
>
从报错看,似乎是作业运行的时候,找不到pyflink,如果确实是这样的话,有几个解决方案:
- 通过API指定集群端的Python路径: set_python_executable,参考 [1]
- 通过配置python.executable,参考[2]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html
[2]
The concept of plugins does not exist in 1.8.1. As a result it should be
sufficient for your use-case to add a dependency on flink-s3-fs-hadoop
to your project.
On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:
Let's close this issue guys please answer my questions. I am using
Flink
1)在PyFlink Table API中可以使用所有SQL中支持的connector,所以HBase
connector也自然支持,具体使用方式可以看一下文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#how-to-use-connectors
2)HBase connector的使用方式可以看一下:
这个问题应该和这个JIRA有关系:https://issues.apache.org/jira/browse/FLINK-21434
目前已经在master和1.12.3上修复了,但是1.12.3还没有release。你要不cherry-pick这个fix,自己build一个版本,验证一下?
On Tue, Mar 9, 2021 at 5:27 PM 梁丁文 wrote:
> 测试PyFlink UDTF,运行数秒后异常退出。下面是UDTF函数
>
>
> class Mac(TableFunction):
> def eval(self, body_data):
>
用的PyFlink版本是多少?另外,如果方便的话,可以提供一个比较容易复现的例子吗?
On Fri, Mar 12, 2021 at 4:57 PM 陈康 <844256...@qq.com> wrote:
> 定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
> 毫无头绪、有大佬遇到过吗?谢谢!
>
> class myKerasMLP(ScalarFunction):
>
> def eval(self, *args):
> ...
> #
Hi Roman and Till,
Thank you very much for your responses.
With regards on the workload variation across the jobs, let me put it like this
1,. We have some jobs which are CPU intensive (and only operator state being
persisted) and there are other jobs which are not so CPU intensive, but have
I'm trying to extract data from a Debezium CDC source, in which one of the
backing tables has an open schema nested JSON field like this:
"objectives": {
"items": [
{
"id": 1,
"label": "test 1"
"size": 1000.0
},
{
"id":
Sometimes writing it down makes you think. I now realize that this is not
the right approach, given that merging windows will have their own
states..and how the merge happens is really at the key level
On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi
wrote:
> I intend to augment every
Hey, Roman
I use every time the same key.
And I get the correct value in StateManager every time the processElement()
method executes.
But then I stop the job and submit it again.
And first execution processElement() get me null in state store. The key
wasn't change.
So, I'am in confuse
Hi Sushruth,
if your jobs need significantly different configurations, then I would
suggest to think about dedicated clusters per job. That way you can
configure the cluster to work best for the respective job. Of course,
running multiple clusters instead of a single one comes at the cost of more
Cool, thanks!
On Fri, Mar 12, 2021, 13:15 Arvid Heise wrote:
> Hi Avi,
>
> thanks for clarifying.
>
> It seems like it's not possible to parse Parquet in Flink without knowing
> the schema. What i'd do is to parse the metadata while setting up the job
> and then pass it to the input format:
>
>
I intend to augment every event in a session with a unique ID. To keep
the session lean, there is a PurgingTrigger on this aggregate that fires
on a count of 1.
>> (except that the number of keys can grow).
Want to confirm that the keys are GCed ( along with state ) once the
(windows close +
Yes, please send me the full stack trace. You could also send it to me
personally if you don't want to share it on the ML.
I'm especially interested in the legacy source thread that holds the lock
0x00058e8c5070 if you only want to share an excerpt.
On Fri, Mar 12, 2021 at 2:29 AM ChangZhuo
Hi Aeden,
just to be sure: All task managers have the same hardware/memory
configuration, haven't they? I'm not 100% sure whether this affects the
slot selection in the end, but it looks like this parameter has also an
influence on the slot matching strategy preferring slots with less
utilization
Hi,
Do I understand correctly that:
1. The workload varies across the jobs but stays the same for the same job
2. With a small number of slots per TM you are concerned about uneven
resource utilization when running low- and high-intensive jobs on the
same cluster simultaneously?
If so, wouldn't
Hi Lei,
yes, metaspace would run out eventually if you run too much in parallel.
All finished jobs will close the classloaders and free the metaspace memory.
For newer setups, we recommend creating an ad-hoc cluster for each Flink
application for this and several other reasons. If you are
Hi Avi,
thanks for clarifying.
It seems like it's not possible to parse Parquet in Flink without knowing
the schema. What i'd do is to parse the metadata while setting up the job
and then pass it to the input format:
ParquetMetadata parquetMetadata =
MetadataReader.readFooter(inputStream, path,
Hi Bob and Alexey,
I double-checked and there is currently no way to achieve what you want.
The good news is that the OOM part should be addressed through FLINK-20833
[1], maybe it's even suitable for other issues.
A "workaround" (I don't think it's a workaround) for your issues would
actually
如果单独执行这个function 的话是没有问题的
select Test().a 是没有问题的
Peihui He 于2021年3月12日周五 下午6:30写道:
> hi, all
>
> 定义一个 ScalarFunction
> class Test extends ScalarFunction{
> @DataTypeHint("ROW")
> def eval(): Row ={
> Row.of("a", "b", "c")
> }
> }
>
> 当执行下面语句的时候
> select Test().a from taba1
>
Hi Vishal,
There is no leak in the code you provided (except that the number of
keys can grow).
But as you figured out the state is scoped to key, not to window+key.
Could you explain what you are trying to achieve and why do you need to combine
sliding windows with state scoped to window+key?
hi, all
定义一个 ScalarFunction
class Test extends ScalarFunction{
@DataTypeHint("ROW")
def eval(): Row ={
Row.of("a", "b", "c")
}
}
当执行下面语句的时候
select Test().a from taba1
会报下面的错误:
java.io.IOException: Fail to run stream sql job at
Are you starting the job from savepoint [1] when submitting it again?
If not, it is considered as a new job and will not pick up the old state.
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#starting-a-job-from-a-savepoint
Regards,
Roman
On Fri, Mar 12, 2021 at
I have following piece of configuration in flink.yaml:
Key Value
high-availability zookeeper
high-availability.storageDir
file:///home/flink/flink-ha-data
Thanks David!
On Fri, Mar 12, 2021, 01:54 David Anderson wrote:
> WatermarkStrategy.withIdleness works by marking idle streams as idle, so
> that downstream operators will ignore those streams and allow the
> watermarks to progress based only on the advancement of the watermarks of
> the still
WatermarkStrategy.withIdleness works by marking idle streams as idle, so
that downstream operators will ignore those streams and allow the
watermarks to progress based only on the advancement of the watermarks of
the still active streams. As you suspected, this mechanism does not provide
for the
定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
毫无头绪、有大佬遇到过吗?谢谢!
class myKerasMLP(ScalarFunction):
def eval(self, *args):
...
# 返回预测结果
return str(trueY[0][0]) + '|' + str(trueY[0][1])
注册UDF函数
myKerasMLP = udf(myKerasMLP(),
I haven't been able to get WatermarkStrategy.withIdleness to work. Is it
broken? None of my timers trigger when I'd expect idleness to take over.
On Tue, Mar 2, 2021 at 11:15 PM Dan Hill wrote:
> Hi.
>
> For local and tests development, I want to flush the events in my system
> to make sure
I figured it out. I have some records with the same key and I was doing an
IntervalJoin. One of the IntervalJoin implementations that I found looks
like it the runtime increases exponentially when there are duplicate keys.
I introduced a de-duping step and it works a lot faster.
On Thu, Mar 11,
Hi Yuri,
The state that you access with getRuntimeContext().getState(...) is
scoped to the key (so for every new key this state will be null).
What key do you use?
Regards,
Roman
On Fri, Mar 12, 2021 at 7:22 AM Maminspapin wrote:
>
> I have following piece of configuration in flink.yaml:
>
>
确认了,pulsar的MessageId的实现类内部的增加了字段,导致flink在反序列化时失败了。具体的issue:https://github.com/streamnative/pulsar-flink/issues/256。
我会给flink 1.9的pulsar连接器升级下checkpoint,让MessageId的序列化使用基于
`MessageId.toByteArray`的序列化器。
非常感谢您的帮助~。
Jianyun8023
2021-03-12
2021年3月11日 下午10:43,Kezhu Wang mailto:kez...@gmail.com>> 写道:
50 matches
Mail list logo