Re: Evenly Spreading Out Source Tasks

2021-03-14 Thread Xintong Song
Hi Aeden,

IIUC, the topic being read has 36 partitions means that your source task
has a parallelism of 36. What's the parallelism of other tasks? Is the job
taking use of all the 72 (18 TMs * 4 slots/TM) slots?

I'm afraid currently there's no good way to guarantee subtasks of a task
are spread out evenly.

The configuration option you mentioned makes sure slots are allocated from
TMs evenly, it does not affect how tasks are distributed over the allocated
slots.
E.g., say your job has two tasks A & B, with parallelism 36 & 54
respectively. That means, with the default slot sharing strategy, your job
needs 54 slots in total to be executed. With the configuration enabled, it
is guaranteed that for each TM 3 slots are occupied. For B (parallelism
54), there's a subtask deployed in each slot, thus 3 subtasks on each TM.
As for A, there're only 36 slots containing a subtask of it, and there's no
guarantee which 36 out of the 54 contain it.

Thank you~

Xintong Song



On Mon, Mar 15, 2021 at 3:54 AM Chesnay Schepler  wrote:

> Is this a brand-new job, with the cluster having all 18 TMs at the time
> of submission? (or did you add more TMs while the job was running)
>
> On 3/12/2021 5:47 PM, Aeden Jameson wrote:
> > Hi Matthias,
> >
> > Yes, all the task managers have the same hardware/memory configuration.
> >
> > Aeden
> >
> > On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl 
> wrote:
> >> Hi Aeden,
> >> just to be sure: All task managers have the same hardware/memory
> configuration, haven't they? I'm not 100% sure whether this affects the
> slot selection in the end, but it looks like this parameter has also an
> influence on the slot matching strategy preferring slots with less
> utilization of resources [1].
> >>
> >> I'm gonna add Chesnay to the thread. He might have more insights here.
> @Chesnay are there any other things that might affect the slot selection
> when actually trying to evenly spread out the slots?
> >>
> >> Matthias
> >>
> >> [1]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
> >>
> >> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson 
> wrote:
> >>> Hi Arvid,
> >>>
> >>>Thanks for responding. I did check the configuration tab of the job
> >>> manager and the setting cluster.evenly-spread-out-slots: true is
> >>> there. However I'm still observing unevenness in the distribution of
> >>> source tasks. Perhaps this additional information could shed light.
> >>>
> >>> Version: 1.12.1
> >>> Deployment Mode: Application
> >>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
> >>> Flink operator https://github.com/lyft/flinkk8soperator
> >>>
> >>> I did place the setting under the flinkConfig section,
> >>>
> >>> apiVersion: flink.k8s.io/v1beta1
> >>> 
> >>> spec:
> >>>flinkConfig:
> >>>  cluster.evenly-spread-out-slots: true
> >>>  high-availability: zookeeper
> >>>  ...
> >>>  state.backend: filesystem
> >>>  ...
> >>>jobManagerConfig:
> >>>  envConfig:
> >>>  
> >>>
> >>> Would you explain how the setting ends up evenly distributing active
> >>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
> >>> TM3 ... TM18 in order and starting again. In my case I have 36
> >>> partitions and 18 nodes so after the second pass in assignment I would
> >>> end up with 2 subtasks in the consumer group on each TM. And then
> >>> subsequent passes result in inactive consumers.
> >>>
> >>>
> >>> Thank you,
> >>> Aeden
> >>>
> >>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise  wrote:
>  Hi Aeden,
> 
>  the option that you mentioned should have actually caused your
> desired behavior. Can you double-check that it's set for the job (you can
> look at the config in the Flink UI to be 100% sure).
> 
>  Another option is to simply give all task managers 2 slots. In that
> way, the scheduler can only evenly distribute.
> 
>  On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <
> aeden.jame...@gmail.com> wrote:
> >  I have a cluster with 18 task managers 4 task slots each
> running a
> > job whose source/sink(s) are declared with FlinkSQL using the Kafka
> > connector. The topic being read has 36 partitions. The problem I'm
> > observing is that the subtasks for the sources are not evenly
> > distributed. For example, 1 task manager will have 4 active source
> > subtasks and other TM's none. Is there a way to force  each task
> > manager to have 2 active source subtasks.  I tried using the setting
> > cluster.evenly-spread-out-slots: true , but that didn't have the
> > desired effect.
> >
> > --
> > Thank you,
> > Aeden
>
>
>


Got “pyflink.util.exceptions.TableException: findAndCreateTableSource failed.” when running PyFlink example

2021-03-14 Thread Yik San Chan
(The question is cross-posted on StackOverflow
https://stackoverflow.com/questions/66632765/got-pyflink-util-exceptions-tableexception-findandcreatetablesource-failed-w
)

I am running below PyFlink program (copied from
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
)

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
 .field('word', DataTypes.STRING())) \
.with_schema(Schema()
 .field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
 .field_delimiter('\t')
 .field('word', DataTypes.STRING())
 .field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
 .field('word', DataTypes.STRING())
 .field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

To verify it works, I did the following in order:

1. Run `echo -e  "flink\npyflink\nflink" > /tmp/input`
1. Run `python WordCount.py`
1. Run `cat /tmp/out` and find expected output

Then I changed my PyFlink program a bit to prefer SQL over Table API, but I
find it doesn't work.

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

my_source_ddl = """
create table mySource (
word VARCHAR
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/input'
)
"""

my_sink_ddl = """
create table mySink (
word VARCHAR,
`count` BIGINT
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output'
)
"""

t_env.sql_update(my_source_ddl)
t_env.sql_update(my_sink_ddl)

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

Here's the error:

```
Traceback (most recent call last):
  File "WordCount.py", line 38, in 
.execute_insert('mySink').wait()
  File
"/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table.py",
line 864, in execute_insert
return TableResult(self._j_table.executeInsert(table_path, overwrite))
  File
"/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 162, in deco
raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSink failed.
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:87)
at
org.apache.flink.table.api.internal.TableEnvImpl.getTableSink(TableEnvImpl.scala:1097)
at org.apache.flink.table.api.internal.TableEnvImpl.org
$apache$flink$table$api$internal$TableEnvImpl$$writeToSinkAndTranslate(TableEnvImpl.scala:929)
at
org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:556)
at
org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:554)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.api.internal.TableEnvImpl.executeInternal(TableEnvImpl.scala:554)
at
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at

回复:1.12 yarn-per-job提交作业失败

2021-03-14 Thread smq
感谢解答





-- 原始邮件 --
发件人: Paul Lam https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn-application-queue
 


回复:1.12 yarn-per-job提交作业失败

2021-03-14 Thread smq
多谢回答





-- 原始邮件 --
发件人: Paul Lam https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn-application-queue
 


Re: 1.12 yarn-per-job提交作业失败

2021-03-14 Thread Paul Lam
从 Flink 1.12 开始,-yqu 等 YARN 相关的参数被移除了,可以使用 [1] 来代替。

[1 
]https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn-application-queue
 


Best,
Paul Lam

> 2021年3月15日 10:22,smq <374060...@qq.com> 写道:
> 
> 我在用这个命令提交的时候会报
> flink Application rejected by queue placement policy
> 这个应该是没有指定queue
> 但是我在命令中加了-yqu 这个参数,在web界面看quene 的时候,不是我指定的,而是default 。
> 另外,我用旧命令提交作业可以正常运行。请问有人碰到过这个问题吗。



Upsert Kafka 的format 为什么要求是INSERT-ONLY的

2021-03-14 Thread 刘首维
Hi all,



最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode 
必须是insert-only的,请问这是什么原因呢。

如果不是的话,请直接指正我,谢谢。





Flink version 1.12.1


Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-14 Thread Shuiqiang Chen
Hi Kevin,

Sorry for the late reply.

Actually, you are able to pass arguments to the constructor of the Java
object when instancing in Python. Basic data types
(char/boolean/int/long/float/double/string, etc) can be directory passed
while complex types (array/list/map/POJO, etc) must be converted to java
objects before passing. Please refer to
https://www.py4j.org/py4j_java_collections.html for more information.

Best,
Shuiqiang

Kevin Lam  于2021年3月11日周四 上午4:28写道:

> A follow-up question--In the example you provided Shuiqiang, there were no
> arguments passed to the constructor of the custom sink/source.
>
> What's the best way to pass arguments to the constructor?
>
> On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam  wrote:
>
>> Thanks Shuiqiang! That's really helpful, we'll give the connectors a try.
>>
>> On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen 
>> wrote:
>>
>>> Hi Kevin,
>>>
>>> Thank you for your questions. Currently, users are not able to defined
>>> custom source/sinks in Python. This is a greate feature that can unify the
>>> end to end PyFlink application development in Python and is a large topic
>>> that we have no plan to support at present.
>>>
>>> As you have noticed that `the Python DataStream API has several
>>> connectors [2] that use Py4J+Java gateways to interoperate with Java
>>> source/sinks`. These connectors are the extensions of the Python abstract
>>> class named `SourceFunction` and `SinkFunction`. Thess two classes can
>>> accept a Java source/sink instance and maintain it to enable the
>>> interoperation between Python and Java.  They can also accept a string of
>>> the full name of a Java/Scala defined Source/SinkFunction class and create
>>> the corresponding java instance. Bellow are the definition of these classes:
>>>
>>> class JavaFunctionWrapper(object):
>>> """
>>> A wrapper class that maintains a Function implemented in Java.
>>> """
>>>
>>> def __init__(self, j_function: Union[str, JavaObject]):
>>> # TODO we should move this part to the get_java_function() to 
>>> perform a lazy load.
>>> if isinstance(j_function, str):
>>> j_func_class = get_gateway().jvm.__getattr__(j_function)
>>> j_function = j_func_class()
>>> self._j_function = j_function
>>>
>>> def get_java_function(self):
>>> return self._j_function
>>>
>>>
>>>
>>> class SourceFunction(JavaFunctionWrapper):
>>> """
>>> Base class for all stream data source in Flink.
>>> """
>>>
>>> def __init__(self, source_func: Union[str, JavaObject]):
>>> """
>>> Constructor of SinkFunction.
>>>
>>> :param source_func: The java SourceFunction object.
>>> """
>>> super(SourceFunction, self).__init__(source_func)
>>>
>>>
>>> class SinkFunction(JavaFunctionWrapper):
>>> """
>>> The base class for SinkFunctions.
>>> """
>>>
>>> def __init__(self, sink_func: Union[str, JavaObject]):
>>> """
>>> Constructor of SinkFunction.
>>>
>>> :param sink_func: The java SinkFunction object or the full name of the
>>> SinkFunction class.
>>> """
>>> super(SinkFunction, self).__init__(sink_func)
>>>
>>> Therefore, you are able to defined custom sources/sinks in Scala and
>>> apply them in Python. Here is the recommended approach for implementation:
>>>
>>> class MyBigTableSink(SinkFunction):
>>> def __init__(self, class_name: str):
>>> super(MyBigTableSink, self).__init__(class_name)
>>>
>>>
>>> def example():
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.add_jars('/the/path/of/your/MyBigTableSink.jar')
>>> # ...
>>> ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
>>> env.execute("Application with Custom Sink")
>>>
>>>
>>> if __name__ == '__main__':
>>> example()
>>>
>>> Remember that you must add the jar of the Scala defined SinkFunction by
>>> calling `env.add_jars()` before adding the SinkFunction. And your custom
>>> sources/sinks function must be the extension of `SourceFunction` and
>>> `SinkFunction`.
>>>
>>> Any further questions are welcomed!
>>>
>>> Best,
>>> Shuiqiang
>>>
>>>
>>> Kevin Lam  于2021年3月3日周三 上午2:50写道:
>>>
 Hello everyone,

 I have some questions about the Python API that hopefully folks in the
 Apache Flink community can help with.

 A little background, I’m interested in using the Python Datastream API
 because of stakeholders who don’t have a background in Scala/Java, and
 would prefer Python if possible. Our team is open to maintaining Scala
 constructs on our end, however we are looking to expose Flink for stateful
 streaming via a Python API to end-users.

 Questions:

 1/ The docs mention that custom Sources and Sinks cannot be defined in
 Python, but must be written in Java/Scala [1]. What is the recommended
 approach for interoperating between custom sinks/sources written in Scala,
 with the Python API? If nothing is currently supported, is it on the road
 map?

 2/ Also, I’ve noted 

Flink sql 实现全局row_number()分组排序

2021-03-14 Thread Tian Hengyu
在做实时数仓的时候,有需求要使用flink sql实现全局的row_number(),请教下各位有啥方案吗?

目前想的是,将流进行row
number处理后存储到hbase中,然后每次处理流数据都和hbase进行关联,row_number处理后将最新结果存入hbase中,即通过对hbase的实时读写实现全局row_number().

请问以上方法可行不,,实时读hbase关联,然后在写入最新数据到hbase,效率会有问题吗,这样能满足实时的需求吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

??????????????????flink ????????????????????????kafka,mysql??

2021-03-14 Thread Asahi Lee
??
   ??flink 
??

Can I use PyFlink together with PyTorch/Tensorflow/PyTorch

2021-03-14 Thread Yik San Chan
Hi community,

I am exploring PyFlink and I wonder if it is possible to use PyFlink
together with all these ML libs that ML engineers normally use: PyTorch,
Tensorflow, Scikit Learn, Xgboost, LightGBM, etc.

According to this SO thread
,
PySpark cannot use Scikit Learn directly inside UDF because Scikit Learn
algorithms are not implemented to be distributed, while Spark runs
distributedly.

Given PyFlink is similar to PySpark, I guess the answer may be "no". But I
would love to double check, and to see what I need to do to make PyFlink
able to define UDFs using these ML libs.


(This question is cross-posted on StackOverflow
https://stackoverflow.com/questions/66631859/can-i-use-pyflink-together-with-pytorch-tensorflow-scikitlearn-xgboost-lightgbm
)


Thanks.


Best,

Yik San


Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-03-14 Thread Yang Wang
If the HA related ConfigMaps still exists, then I am afraid the data
located on the distributed storage should also exist.
So I suggest to delete the HA related storage as well.

Delete all the HA related data manually should help in your current
situation. After then you could recover from the new savepoint.
However, I do not think this is a normal behavior. Since when the
application reached the terminal state(e.g. FINISHED, FAILED, CANCELLED),
all HA related data should be cleaned up automatically.

Could you help to provide the JobManager logs when you are trying to cancel
the job? I believe using `kubectl logs -f {pod_name}` could dump
the logs in real time.

Best,
Yang

Alexey Trenikhun  于2021年3月12日周五 上午12:47写道:

> Hi Yang,
> Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true,
> but perhaps I hit FLINK-21028. This lead to question, if normal via API
> take-savepoint-and-cancel-job fails, what steps should be done outside
> Flink to be able to resume from savepoint with new job version? Is deleting
> Kubernetes Job and HA configmaps enough, or something in persisted storage
> should be deleted as well?
>
> Thanks,
> Alexey
> --
> *From:* Yang Wang 
> *Sent:* Thursday, March 11, 2021 2:59 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Kubernetes HA - attempting to restore from wrong
> (non-existing) savepoint
>
> Hi Alexey,
>
> From your attached logs, it seems that the leader related config map is
> reused.
> Then the Flink application is recovered instead of submitting a new one.
> This is
> the root cause it is trying to recover from a wrong savepoint which is
> specified in
> your last submission.
>
> > So how to fix this?
> If you want to stop the application, I strongly suggest to cancel the
> flink job with savepoint
> instead of directly deleting all the K8s resources. After then, you will
> find that the leader
> related config maps will be deleted automatically after the job is
> cancelled.
>
> Best,
> Yang
>
> Alexey Trenikhun  于2021年3月10日周三 下午12:16写道:
>
> Hi Yang,
> The problem is re-occurred, full JM log is attached
>
> Thanks,
> Alexey
> --
> *From:* Yang Wang 
> *Sent:* Sunday, February 28, 2021 10:04 PM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Kubernetes HA - attempting to restore from wrong
> (non-existing) savepoint
>
> Hi Alexey,
>
> It seems that the KubernetesHAService works well since all the checkpoints
> have been cleaned up when the job is canceled.
> And we could find related logs "Found 0 checkpoints in
> KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.".
>
> However, it is a little strange that the CheckpointCoordinator is
> recovering from a wrong savepoint path. Could you share the
> full JobManager logs? One possible reason I could guess is the application
> cluster entrypoint is not creating a new JobGraph from the specified
> arguments.
>
>
> Best,
> Yang
>
> Alexey Trenikhun  于2021年2月27日周六 上午1:48写道:
>
> Hello,
> We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is
> deployed as Job, single TM as StatefulSet). We taken savepoint with
> cancel=true. Now when we are trying to start job using --fromSavepoint *A*,
> where is *A* path we got from taking savepoint (ClusterEntrypoint reports
> *A* in log), but looks like Job for some reason ignores given *A* and
> actually trying to restore from some path *B* (CheckpointCoordinator logs
> *B* ):
>
> *{"ts":"2021-02-26T17:09:52.500Z","message":" Program
> Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-e8a201008f2c
> ","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> 

Re: Extracting state keys for a very large RocksDB savepoint

2021-03-14 Thread Andrey Bulgakov
If anyone is interested, I reliazed that State Processor API was not the
right tool for this since it spends a lot of time rebuilding RocksDB tables
and then a lot of memory trying to read from it. All I really needed was
operator keys.

So I used SavepointLoader.loadSavepointMetadata to get KeyGroupsStateHandle
objects and built an InputFormat heavily based on the code I found
in RocksDBFullRestoreOperation.java.

It ended up working extremely quickly while keeping memory and CPU usage at
the minimum.

On Tue, Mar 9, 2021 at 1:51 PM Andrey Bulgakov  wrote:

> Hi all,
>
> I'm trying to use the State Processor API to extract all keys from a
> RocksDB savepoint produced by an operator in a Flink streaming job into CSV
> files.
>
> The problem is that the storage size of the savepoint is 30TB and I'm
> running into garbage collection issues no matter how much memory in
> different proportions or CPU cores I allocate to task managers. (I tried
> allocating up to 120GB and 16 cores to each task).
>
> The same program and hardware configuration works with no problems for a
> smaller savepoint (300GB), it's some sort of a scalability issue here.
>
> At the beginning the tasks spend a couple hours in what I call "the
> download phase". During that phase heap usage as indicated by metrics and
> Flink UI is at about 10% and everything is going great.
>
> But at certain point heap usage for tasks coming out of the download phase
> starts to go up, climbs up to about 87% usage as indicated in Flink UI and
> by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap
> usage metric doesn't increase anymore and JVM starts spending a lot of time
> collecting garbage and keeping all CPUs 100% loaded. After some time in
> this mode the job crashes with "java.util.concurrent.TimeoutException:
> Heartbeat of TaskManager with id container_1614821414188_0002_01_35
> timed out."
>
> At all times the indicated managed memory usage is 0%. Which seems
> suspicious since RocksDB is supposed to be using it?
>
> Also, judging by the lack of an application metric I have in the state
> processor operator, KeyedStateReaderFunction.readKey never gets called.
>
> I would appreciate if somebody helped answering some of my questions or
> suggested a way I could further diagnose/fix this:
>
> 1. Is it normal that this overwhelming garbage collection starts long
> before reaching 100% heap usage? At the time it happens there 's usually
> 10-15GB of heap showing up as available.
>
> 2. Am I correct to assume that even in batch mode Flink implements memory
> back pressure and is supposed to slow down processing/allocations when it's
> low on available heap memory?
>
> 3. If #2 is true, is it possible that due to some misconfiguration Flink
> considers more heap space to be available than there actually is and keeps
> allocating even though there's no more heap?
>
> 4. As an alternative to #3, is it possible that there are some unaccounted
> heap allocations that are not shown in the UI and by the metric and
> therefore not taken into account by the memory back pressure mechanism?
>
> Here's the minimal code example that demonstrates the issue:
> https://gist.github.com/andreiko/94c675b4f04b40144b4cb4474b2f050f
>
> I'm running this on Flink 12.2 (and many earlier versions, too) with the
> following base configuration and parallelism of 80 (tried lowering that to
> have more resources available, too):
> https://gist.github.com/andreiko/305d77c23be605042b85d9d4eb63f025
>
> I tried many things with no success:
> - reducing parallelism and making more resources available to each task
> manager
> - enabling object reuse and modifying the tuple mapper to avoid extra
> tuple allocations
> - manipulating memory ratios to allocate more memory to be used as heap,
> managed
> - allocating 20% of memory for JVM overhead
> - switching to G1GC garbage collector
>
> Again, would appreciate any help with this.
>
> --
> With regards,
> Andrey Bulgakov
>


-- 
With regards,
Andrey Bulgakov


1.12 yarn-per-job提交作业失败

2021-03-14 Thread smq
我在用这个命令提交的时候会报
flink Application rejected by queue placement policy
这个应该是没有指定queue
但是我在命令中加了-yqu 这个参数,在web界面看quene 的时候,不是我指定的,而是default 。
另外,我用旧命令提交作业可以正常运行。请问有人碰到过这个问题吗。

Re: Question about session_aggregate.merging-window-set.rocksdb_estimate-num-keys

2021-03-14 Thread Vishal Santoshi
All I can think is, that any update on a state key, which I do in my
ProcessFunction, creates an update ( essentially an append on rocksdb )
which does render the previous value for the key, a  tombstone , but that
need not reflect on the count  ( as double or triple counts ) atomically,
thus the called as an "estimate" , but was not anticipating this much
difference ...

On Sun, Mar 14, 2021 at 5:32 PM Vishal Santoshi 
wrote:

> The reason I ask is that I have a "Process Window Function" on that
> Session  Window  and I keep key scoped Global State.  I maintain a TTL on
> that state ( that is outside the Window state )  that is roughly the
> current WM + lateness.
>
> I would imagine that keys for that custom state are *roughly* equal to
> the number of keys in the "merging-window-set" . It seems twice that number
> but does follow the slope. I am trying to figure out why this deviation.
>
> public void process(KEY key,
> ProcessWindowFunction, KeyedSessionWithSessionID<
> KEY, VALUE>, KEY, TimeWindow>.Context context,
> Iterable> elements, Collector<
> KeyedSessionWithSessionID> out)
> throws Exception {
> // scoped to the key
> if (state.value() == null) {
> this.newKeysInState.inc();
> state.update(new IntervalList());
> }else{
> this.existingKeysInState.inc();
> }
>
> On Sun, Mar 14, 2021 at 3:32 PM Vishal Santoshi 
> wrote:
>
>> Hey folks,
>>
>>   Was looking at this very specific metric
>> "session_aggregate.merging-window-set.rocksdb_estimate-num-keys".  Does
>> this metric also represent session windows ( it is a session window ) that
>> have lateness on them ? In essence if the session window was closed but has
>> a lateness of a few hours would those keys still be counted against this
>> metric.
>>
>> I think they should as it is an estimate keys for the Column Family for
>> the operator and if the window has not been GCed then the key for those
>> Windows should be in RocksDB but wanted to be sure.
>>
>> Regards.
>>
>>
>>


Re: Question about session_aggregate.merging-window-set.rocksdb_estimate-num-keys

2021-03-14 Thread Vishal Santoshi
The reason I ask is that I have a "Process Window Function" on that
Session  Window  and I keep key scoped Global State.  I maintain a TTL on
that state ( that is outside the Window state )  that is roughly the
current WM + lateness.

I would imagine that keys for that custom state are *roughly* equal to the
number of keys in the "merging-window-set" . It seems twice that number but
does follow the slope. I am trying to figure out why this deviation.

public void process(KEY key,
ProcessWindowFunction, KeyedSessionWithSessionID<
KEY, VALUE>, KEY, TimeWindow>.Context context,
Iterable> elements, Collector<
KeyedSessionWithSessionID> out)
throws Exception {
// scoped to the key
if (state.value() == null) {
this.newKeysInState.inc();
state.update(new IntervalList());
}else{
this.existingKeysInState.inc();
}

On Sun, Mar 14, 2021 at 3:32 PM Vishal Santoshi 
wrote:

> Hey folks,
>
>   Was looking at this very specific metric
> "session_aggregate.merging-window-set.rocksdb_estimate-num-keys".  Does
> this metric also represent session windows ( it is a session window ) that
> have lateness on them ? In essence if the session window was closed but has
> a lateness of a few hours would those keys still be counted against this
> metric.
>
> I think they should as it is an estimate keys for the Column Family for
> the operator and if the window has not been GCed then the key for those
> Windows should be in RocksDB but wanted to be sure.
>
> Regards.
>
>
>


Re: Evenly Spreading Out Source Tasks

2021-03-14 Thread Chesnay Schepler
Is this a brand-new job, with the cluster having all 18 TMs at the time 
of submission? (or did you add more TMs while the job was running)


On 3/12/2021 5:47 PM, Aeden Jameson wrote:

Hi Matthias,

Yes, all the task managers have the same hardware/memory configuration.

Aeden

On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl  wrote:

Hi Aeden,
just to be sure: All task managers have the same hardware/memory configuration, 
haven't they? I'm not 100% sure whether this affects the slot selection in the 
end, but it looks like this parameter has also an influence on the slot 
matching strategy preferring slots with less utilization of resources [1].

I'm gonna add Chesnay to the thread. He might have more insights here. @Chesnay 
are there any other things that might affect the slot selection when actually 
trying to evenly spread out the slots?

Matthias

[1] 
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141

On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson  wrote:

Hi Arvid,

   Thanks for responding. I did check the configuration tab of the job
manager and the setting cluster.evenly-spread-out-slots: true is
there. However I'm still observing unevenness in the distribution of
source tasks. Perhaps this additional information could shed light.

Version: 1.12.1
Deployment Mode: Application
Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
Flink operator https://github.com/lyft/flinkk8soperator

I did place the setting under the flinkConfig section,

apiVersion: flink.k8s.io/v1beta1

spec:
   flinkConfig:
 cluster.evenly-spread-out-slots: true
 high-availability: zookeeper
 ...
 state.backend: filesystem
 ...
   jobManagerConfig:
 envConfig:
 

Would you explain how the setting ends up evenly distributing active
kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
TM3 ... TM18 in order and starting again. In my case I have 36
partitions and 18 nodes so after the second pass in assignment I would
end up with 2 subtasks in the consumer group on each TM. And then
subsequent passes result in inactive consumers.


Thank you,
Aeden

On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise  wrote:

Hi Aeden,

the option that you mentioned should have actually caused your desired 
behavior. Can you double-check that it's set for the job (you can look at the 
config in the Flink UI to be 100% sure).

Another option is to simply give all task managers 2 slots. In that way, the 
scheduler can only evenly distribute.

On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson  wrote:

 I have a cluster with 18 task managers 4 task slots each running a
job whose source/sink(s) are declared with FlinkSQL using the Kafka
connector. The topic being read has 36 partitions. The problem I'm
observing is that the subtasks for the sources are not evenly
distributed. For example, 1 task manager will have 4 active source
subtasks and other TM's none. Is there a way to force  each task
manager to have 2 active source subtasks.  I tried using the setting
cluster.evenly-spread-out-slots: true , but that didn't have the
desired effect.

--
Thank you,
Aeden





Question about session_aggregate.merging-window-set.rocksdb_estimate-num-keys

2021-03-14 Thread Vishal Santoshi
Hey folks,

  Was looking at this very specific metric
"session_aggregate.merging-window-set.rocksdb_estimate-num-keys".  Does
this metric also represent session windows ( it is a session window ) that
have lateness on them ? In essence if the session window was closed but has
a lateness of a few hours would those keys still be counted against this
metric.

I think they should as it is an estimate keys for the Column Family for the
operator and if the window has not been GCed then the key for those Windows
should be in RocksDB but wanted to be sure.

Regards.


Re: Questions with State Processor Api

2021-03-14 Thread Maminspapin
Please, someone help me to understand is State Processor Api a solve or not
for a task.

I want to add to state 'Events' some target actions of user and remove them
if cancel action is received.

Every X period I need to check this state if it's time to make some
communication with user. If yes, so make it and update the state 'Events',
and also update the state 'Users'.

I assume the State Processor Api could help me to manage with stores.
So when X period is happen, I get savepoint which is full state of
application, load 'Events' and 'Users' states. Now I can query 'Events' with
sql and check time for communication with user. And make updates of states
if it needed.

Or this idea is not good and Flink is not suitable for the task?

Thanks,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Handling Bounded Sources with KafkaSource

2021-03-14 Thread Maciej Obuchowski
Hey Rion,

We solved this issue by using usual, unbounded streams, and using
awaitility library to express conditions that would end the test - for
example, having particular data in a table.

IMO this type of testing has the advantage that you won't have divergent
behavior from production as you have experienced.

Regards,
Maciej



On Sun, Mar 14, 2021, 05:41 Rion Williams  wrote:

> Following up on this issue, I realized my initial problem was that my test
> case only contained a single message to send through the pipeline. This
> resulted in the earliest offset also being the latest and things didn’t
> exactly work as expected. Once I added several other messages and sent them
> through, the pipeline appeared to run as expected.
>
> However, the use of “bounded” seems to be fickle in terms of test cases.
> Since an experience is thrown once the bound is reached, I can typically
> just wrap my test execution within a try/catch and simply apply my
> assertion afterwards.
>
> This occasionally results in passing tests, but in others, it seems that
> the bound is reached prior to processing the messages it had seen thus far,
> and as a result yields a failing test. I don’t know if this is a bug, or
> intentional, but I’m not aware of a workaround that could “force” the
> pipeline to finish processing all of the messages from the topic once the
> bound is reached. I’ve tried sending through “flush records” to the topic,
> however since there are multiple partitions, it’s not guaranteed that the
> pipeline will read those last.
>
> This is purely a testing problem, as a production job would be streaming
> and unbounded, however I’d love to have a reliable integration test or a
> pattern that I could use to guarantee the processing of a finite set of
> data via a KafkaSource (I.e. send finite records to Kafka, read from topic,
> process all records, apply assertion after processing).
>
> Any ideas/recommendations/workarounds would be greatly welcome and I’d be
> happy to share my specific code / use-cases if needed.
>
> Thanks much,
>
> Rion
>
> On Mar 12, 2021, at 10:19 AM, Rion Williams  wrote:
>
> 
> Hi all,
>
> I've been using the KafkaSource API as opposed to the classic consumer and
> things have been going well. I configured my source such that it could be
> used in either a streaming or bounded mode, with the bounded approach
> specifically aimed at improving testing (unit/integration).
>
> I've noticed that when I attempt to run through a test - it seems that the
> pipeline never acknowledges the "end" of the stream in a bounded context
> and just runs forever and never makes it to my assert.
>
> Does anything look glaringly wrong with how the source is being defined?
>
> object KafkaEventSource {
>
> fun withParameters(parameters: ParameterTool): KafkaSource {
> val schemaRegistryUrl = parameters.getRequired("schema.registry.url")
>
> val builder = KafkaSource.builder()
> .setBootstrapServers(parameters.getRequired("bootstrap.servers"))
> .setGroupId(parameters.getRequired("group.id"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setProperty("schema.registry.url", schemaRegistryUrl)
> .setTopics(parameters.getRequired("topic"))
> .setDeserializer(EventDeserializer(schemaRegistryUrl))
>
> if (parameters.getBoolean("bounded", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
>
> return builder.build()
> }
> }
>
> I can verify that the generated source has it's boundedness set properly
> and all of the configuration options are correct.
>
> My test itself is fairly simple and can be broken down as follows:
>
>1. Inject records into a Kafka Topic
>2. Initialize my Flink job using all of my testing parameters
>3. Apply my assertion (in this case verifying that a JdbcSink wrote to
>a specific database)
>
> @Test
> fun `Example `(){
> // Arrange
> val events = getTestEvents()
> sendToKafka(events, parameters)
>
> // Act
> EntityIdentificationJob.run(parameters)
>
> // Assert
> val users = queryCount("SELECT * FROM users", connection)
> assertEquals(1, users)
> }
>
> Where my job itself is broken down further and reads from the source,
> performs a process function into multiple side outputs and writes each of
> them to a distinct JdbcSink based on the type:
>
> @JvmStatic
> fun main(args: Array) {
> val parameters = loadParams(args)
> val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>
> // Read from Kafka
> val entities = stream
>.fromSource(KafkaEventSource.withParameters(parameters), 
> WatermarkStrategy.noWatermarks(), "kafka")
>.process(IdentifyEntitiesFunction())
>
> // Write out each tag to its respective sink
> for (entityType in EntityTypes.all) {
> entities
> .getSideOutput(entityType)
> 

pyflink使用的一些疑问

2021-03-14 Thread qian he
你好,

最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map
reduce函数,所以有以下疑问:
1.Python flink的SDK还没支持dataset吗?
2.是不是有其他替代方法?
3.如果还没支持,有计划支持的时间吗?
4.flink table为啥不支持map reduce操作?
5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map
reduce操作,对应pandas项目改造成flink,有什么好的建议么?
6. datastream api为什么没有实现Windows方法?后面版本会支持吗?

非常感谢,十分看好flink,希望社区越做越大,辛苦了!