Re: Emit message at start and end of event time session window

2020-03-26 Thread Manas Kale
Hi Till,
Thank you for the explanation, I understand the behaviour now.


On Thu, Mar 26, 2020 at 9:23 PM Till Rohrmann  wrote:

> A quick update concerning your observations. The reason why you are seeing
> the unordered output is because in the gist we used
> a AssignerWithPeriodicWatermarks which generates watermarks periodically.
> Due to this aspect, it can happen that Flink already process all elements
> up to "20" before it sees the next watermark which triggers the processing.
> If there are multiple windows being processed, Flink does not give a
> guarantee in which order this happens.
>
> You can avoid this behaviour if you used
> an AssignerWithPunctuatedWatermarks instead. This watermark assigner is
> called for every record. The updated gist [1] shows how it is used.
>
> [1] https://gist.github.com/tillrohrmann/dda90b8b0e67e379a8dfee967fbd9af1
>
> Cheers,
> Till
>
> On Thu, Mar 26, 2020 at 4:27 PM Till Rohrmann 
> wrote:
>
>> Hmm, I might have given you a bad advice. I think the problem becomes
>> harder because with Flink's window and trigger API we need to keep state
>> consistent between the Trigger and the Window function. Maybe it would be
>> easier to not rely on the windowing mechanism and instead to use Flink's
>> process function [1] to implement the logic yourself.
>>
>> With the process function you have basically a low level API with which
>> you can implement an operator which groups incoming events according to
>> sessions and outputs the required information.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 26, 2020 at 11:27 AM Manas Kale 
>> wrote:
>>
>>> Hi Till,
>>> I see, thanks for the clarification.
>>> Assuming all other setting are the same, if I generate events as follows
>>> :
>>> Element.from("1", 1000L),
>>> Element.from("2", 2000L),
>>> Element.from("3", 3000L),
>>> Element.from("10", 1L)
>>> ,Element.from("11", 11000L),
>>> Element.from("12", 12000L),
>>> Element.from("20", 2L)
>>> we will expect 2 session windows to be created {1,2,3} and {10,11,12}
>>> with appropriate messages. However, when I run this, there seems to be a
>>> problem in the valueState of MyWindowFunction. Apparently that state is
>>> being shared by both the session windows, which leads to incorrect results.
>>> To solve this, I replaced it with a MapState. The Long is
>>> the start timestamp of a window, something that can uniquely identify
>>> different windows. This works but with one caveat : if we have two
>>> subsequent windows, the ordering of messages is :
>>>
>>> window1 started @ 1000 -> window2 started @ 1 -> window1 ended @
>>> 8000 -> window2 ended @ 17000
>>>
>>> whereas I expect it to be :
>>> window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @
>>> 1 -> window2 ended @ 17000
>>>
>>> I thought Flink would execute event time timers and process events in
>>> chronological event time order. However, it seems that the onEventTime()
>>> invocation of window1 is called *after *elements from window2 have been
>>> processed even though window1's onEventTime() is earlier in event time.
>>>
>>> Is my approach and reasoning correct? Also, is it possible to get the
>>> messages in the expected order?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Manas,

 the problem is that the print() statement is being executed with a
 different parallelism than 1. Due to this fact, the messages coming from
 the window function will be sent in round-robin fashion to the print
 operators. If you remove the setParallelism(1) from the window function,
 then the window function will be executed with the same parallelism as the
 print operator. Due to this fact, there is no round-robin distribution of
 the events but every window function task will simply forward its
 elements to its print operator task. You should be able to see these
 topology differences in the web ui.

 You could configure the print() operator to run with a parallelism of 1
 as well by adding a setParallelism(1) statement to it.

 Cheers,
 Till

 On Thu, Mar 26, 2020 at 7:11 AM Manas Kale 
 wrote:

> Hi Till,
> When I run the example code that you posted, the order of the three
> messages (window started, contents of window and window ended) is
> non-deterministic. This is surprising to me, as setParallelism(1) has been
> used in the pipeline - I assumed this should eliminate any form of race
> conditions for printing. What's more is that if I *remove*
> setParallelism(1) from the code, the output is deterministic and correct
> (i.e. windowStarted -> windowContents -> windowEnded).
>
> Clearly, something is wrong with my 

Re: Issue with Could not resolve ResourceManager address akka.tcp://flink

2020-03-26 Thread Zhu Zhu
Hi Vitaliy,

>> *Cannot serve slot request, no ResourceManager connected*
This is not a problem, just that the JM needs RM to be connected to send
slot requests.

>> *Could not resolve ResourceManager address
akka.tcp://flink@prod-bigd-dn11:43757/user/resourcemanager*
This should be the root cause. Would you check whether the hostname
*prod-bigd-dn11* is resolvable? And whether the port 43757 of that machine
is permitted to be accessed?

Thanks,
Zhu Zhu

Vitaliy Semochkin  于2020年3月27日周五 上午1:54写道:

> Hi,
>
> I'm facing an issue similar to
> https://issues.apache.org/jira/browse/FLINK-14074
> Job starts and then yarn logs report "*Could not resolve ResourceManager
> address akka.tcp://flink*"
>
> A fragment from yarn logs looks like this:
>
> LazyFromSourcesSchedulingStrategy]
> 16:54:21,279 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- Job Flink Java Job at Thu Mar 26 16:54:09 CET 2020
> (9817283f911d83a6d278cc39d17d6b11) switched from state CREATED to RUNNING.
> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
> data set from EMC events) (1/3) (5482b0e6ae1d64d9b0918ec15599211f) switched
> from CREATED to SCHEDULED.
> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
> data set from EMC events) (2/3) (5c993710423eea47ae66f833b2999530) switched
> from CREATED to SCHEDULED.
> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
> data set from EMC events) (3/3) (23cfa30fba857b2c75ba76a21c7d4972) switched
> from CREATED to SCHEDULED.
> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
> data set from EMD events) (1/3) (7cc8a395b87e82000184724eb1698ace) switched
> from CREATED to SCHEDULED.
> 16:54:21,288 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
> data set from EMD events) (2/3) (5edfe3d1f509856d17fa0da078cb3f7e) switched
> from CREATED to SCHEDULED.
> 16:54:21,288 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
> data set from EMD events) (3/3) (dd3397f889a3fad1acf4c59f59a93d92) switched
> from CREATED to SCHEDULED.
> 16:54:21,297 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
> serve slot request, no ResourceManager connected. Adding as pending request
> [SlotRequestId{b4c6e7357e4620bf2e997c46d7723eb1}]
> 16:54:21,301 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
> serve slot request, no ResourceManager connected. Adding as pending request
> [SlotRequestId{841bbb79b01b5e0d9ae749a03f65c303}]
> 16:54:21,301 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
> serve slot request, no ResourceManager connected. Adding as pending request
> [SlotRequestId{496120465d541ea9fd2ffcec89e2ac3b}]
> 16:54:21,304 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>- Connecting to ResourceManager akka.tcp://
> fl...@prod-bigd-dn11.net:43757/user/resourcemanager()
> 16:54:21,307 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>- Could not resolve ResourceManager address
> akka.tcp://flink@prod-bigd-dn11:43757/user/resourcemanager, retrying in
> 

Re: flinksql如何控制结果输出的频率

2020-03-26 Thread Jun Zhang
hi:
你可以自定义一个trigger [1]
第二个场景是可以的,第一种场景我没有遇到过这种场景,你可以试试。

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

flink小猪 <18579099...@163.com> 于2020年3月27日周五 上午11:29写道:

> 我有两个需求
> 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办?
> 2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?


Re: How to move event time forward using externally generated watermark message

2020-03-26 Thread Manas Kale
Thanks for the help, Arvid!

On Tue, Mar 24, 2020 at 1:30 AM Arvid Heise  wrote:

> Hi Manas,
>
> both are valid options.
>
> I'd probably add a processing time timeout event in a process function,
> which will only trigger after no event has been received after 1 minute. In
> this way, you don't need to know which devices there are and just enqueue
> one timer per key (=device id).
>
> After the process function, you'd need to reapply your watermark assigner
> as processing time and event time usually don't mix well and need to be
> explicitly resolved.
>
> After the assigner, you can then simply filter out the timeout event and
> don't need to care in downstream operations.
>
> On Mon, Mar 23, 2020 at 11:42 AM Manas Kale  wrote:
>
>> Hi,
>> I have a scenario where I have an input event stream from various IoT
>> devices. Every message on this stream can be of some eventType and has an
>> eventTimestamp. Downstream, some business logic is implemented on this
>> based on event time.
>> In case a device goes offline, what's the best way to indicate to this
>> system that even time has progressed? Should I :
>>
>>- Send a special message that contains only event time information,
>>and write code to handle this message in all downstream operators?
>>
>>
>>- Implement some processing time timer in the system that will tick
>>the watermark forward if we don't see any message for some duration? I 
>> will
>>still need to write code in downstream operators that handles this timer's
>>trigger message.
>>
>> I would prefer not writing code to handle special watermark messages. So
>> does Flink provide any API level call that I can use to tick the watermark
>> forward for all downstream operators when this special message is received
>> / timer is fired?
>>
>


Re: [udf questions]

2020-03-26 Thread WuPangang
ERROR log:
.
Job has been submitted with JobID 91ac323d4d5338418883240680192f34
Traceback (most recent call last):
  File "", line 1, in 
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/table/table_environment.py",
 line 907, in execute
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884pyflink.zip/pyflink/util/exceptions.py",
 line 147, in deco
  File 
"/tmp/dee3edf4-08ff-4c2c-87aa-fef573fae884py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
91ac323d4d5338418883240680192f34)
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 91ac323d4d5338418883240680192f34)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=50, 
backoffTimeMS=6)
at 

flinksql如何控制结果输出的频率

2020-03-26 Thread flink小猪
我有两个需求
1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办?
2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?

Re: [udf questions]

2020-03-26 Thread WuPangang
感谢大佬回复。
根据邮件里面的提示下我尝试了如下操作:

@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
def str_add(str_name):
  return '1'
table_env.register_function("str_add", str_add)
table_env.sql_update("insert into flink_sinktable_ad_test_1 \
select \
str_add(topicid) AS topicid \
from \
flink_sourcetable_ad_test \
")
目的:我的目的是想通过最简单的方式看看udf是否有生效。
结果:结果依赖没有数据流入近来。
其他手段和测试:我通过不使用udf来验证数据流是否正常的。结果正常。


所以能在分析下么?或者我应该如何深入的跟踪下?


all code below:
from pyflink.datastream import 
StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
from pyflink.table import StreamTableEnvironment, 
EnvironmentSettings,TableSink,TableConfig,DataTypes
from pyflink.table.descriptors import 
Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
from pyflink.common import RestartStrategies
from pyflink.table.udf import udf
import json

env = StreamExecutionEnvironment.get_execution_environment()
##contain设置
env.set_parallelism(12)
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
##使用blink api
environment_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
table_env = 
StreamTableEnvironment.create(env,environment_settings=environment_settings)

table_env.sql_update("CREATE TABLE flink_sourcetable_ad_test ( \
host STRING, \
type STRING, \
topicid STRING, \
message STRING, \
proctime as PROCTIME() \
) WITH ( \
  'connector.type' = 'kafka',\
  'connector.version' = 'universal', \
  'connector.topic' = 'advertise_module',  \
  'connector.properties.zookeeper.connect' = 'localhost:2181', \
  'connector.properties.bootstrap.servers' = '172.25.80.134:9092', \
  'connector.properties.group.id' = 'flink_1.10_test_source', \
  'connector.startup-mode' = 'latest-offset', \
  'format.type' = 'json' \
)")

table_env.sql_update("CREATE TABLE flink_sinktable_ad_test_1 ( \
topicid STRING \
) WITH ( \
  'connector.type' = 'kafka',\
  'connector.version' = 'universal', \
  'connector.topic' = 'recommend_user_concern_test',  \
  'connector.properties.zookeeper.connect' = 'localhost:2181', \
  'connector.properties.bootstrap.servers' = '172.25.82.77:9092', \
  'connector.properties.group.id' = 'flink_1.10_test_sink', \
  'connector.startup-mode' = 'latest-offset', \
  'connector.properties.retries' = '3', \
  'format.type' = 'json', \
  'connector.properties.update_mode' = 'append' \
)")
@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
def str_add(str_name):
  return '1'
table_env.register_function("str_add", str_add)
#table_env.register_function("str_add", udf(lambda i: i + '1', 
DataTypes.STRING(), DataTypes.STRING()))
table_env.sql_update("insert into flink_sinktable_ad_test_1 \
select \
str_add(topicid) AS topicid \
from \
flink_sourcetable_ad_test \
")
table_env.execute('flink_1.10_test’)

--
> 在 2020年3月26日,下午5:55,jincheng sun  写道:
> 
> 比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。
> 
> Best,
> Jincheng
> 
> 
> WuPangang mailto:wpangang1...@icloud.com>> 
> 于2020年3月26日周四 下午5:24写道:
> Data as below:
>  
> {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23
>  
> 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/down-ddz.734399.com
>  
> 

回复:(无主题)

2020-03-26 Thread 被惊艳的时光
并发是200和400两种,集群有270多个节点,不过可用的vcores是6600多,内存是17T左右,看了执行图q43这个存在数据倾斜的的问题,失败的节点存在数据量偏大的情况



---原始邮件---
发件人: "Jingsong Li"

Re: (无主题)

2020-03-26 Thread Jingsong Li
Hi,

- 是否是计算规模的问题?
集群大小合适吗?并发合适吗?

- 是否是Plan不优的问题?
Hive的表有做Analysis吗?

CC: user

Best,
Jingsong Lee

On Thu, Mar 26, 2020 at 8:27 PM 被惊艳的时光 <2521929...@qq.com> wrote:

>
> hello,你好,有个关于flink-sql-benchmark工具的问题需要请教下,在做tpc-ds测试时,当数据量达到4T时(flink版本1.10),q43,q67,q70这三条sql执行出错了,都是在hashjoin的时候失败啦,报错信息是hashjoin迭代的次数过多,不知道之前你们在测试时有没有出现这种情况
>


-- 
Best, Jingsong Lee


Re: usae of ClusterSpecificationBuilder.taskManagerMemoryMB

2020-03-26 Thread Vitaliy Semochkin
Got it, thank you very much for the reply.
So far we can not avoid using ClusterSpecification because
 clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph... )
depends on it.

Best Regards,
Vitaliy

On Tue, Mar 24, 2020 at 5:24 AM Xintong Song  wrote:

> Hi Vitality,
>
> After FLIP-49, ClusterSpecification.taskManagerMemoryMB is no longer
> necessary. It can be completely replaced by
> `taskmanager.memory.process.size`. It is kept merely for legacy reasons.
>
> I'm actually thinking about removing ClusterSpecification, maybe after
> finishing FLIP-116 [1], which replaces masterMemoryMB with
> `jobmanager.memory.process.size`. That would also involve refactoring
> YarnClusterDescriptor, which is not in good shape (e.g. the method
> startAppMaster has more than 400 lines) and is closely coupled with
> ClusterSpecification.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Mar 24, 2020 at 5:59 AM Vitaliy Semochkin 
> wrote:
>
>> Hi,
>>
>> what ClusterSpecificationBuilder.taskManagerMemoryMB is for in flink 1.10?
>> It's only usage I see is in YarnCluserDescriptor.validateClusterResources
>> and I do not get the meaning of it.
>> How is it different from taskmanager.memory.process.size?
>> And what's the point of having it, if it's not used anywhere?
>>
>> Regards,
>> Vitaliy
>>
>


Issue with Could not resolve ResourceManager address akka.tcp://flink

2020-03-26 Thread Vitaliy Semochkin
Hi,

I'm facing an issue similar to
https://issues.apache.org/jira/browse/FLINK-14074
Job starts and then yarn logs report "*Could not resolve ResourceManager
address akka.tcp://flink*"

A fragment from yarn logs looks like this:

LazyFromSourcesSchedulingStrategy]
16:54:21,279 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
 - Job Flink Java Job at Thu Mar 26 16:54:09 CET 2020
(9817283f911d83a6d278cc39d17d6b11) switched from state CREATED to RUNNING.
16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
 - CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
data set from EMC events) (1/3) (5482b0e6ae1d64d9b0918ec15599211f) switched
from CREATED to SCHEDULED.
16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
 - CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
data set from EMC events) (2/3) (5c993710423eea47ae66f833b2999530) switched
from CREATED to SCHEDULED.
16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
 - CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
data set from EMC events) (3/3) (23cfa30fba857b2c75ba76a21c7d4972) switched
from CREATED to SCHEDULED.
16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
 - CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
data set from EMD events) (1/3) (7cc8a395b87e82000184724eb1698ace) switched
from CREATED to SCHEDULED.
16:54:21,288 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
 - CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
data set from EMD events) (2/3) (5edfe3d1f509856d17fa0da078cb3f7e) switched
from CREATED to SCHEDULED.
16:54:21,288 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
 - CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
data set from EMD events) (3/3) (dd3397f889a3fad1acf4c59f59a93d92) switched
from CREATED to SCHEDULED.
16:54:21,297 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
 - Cannot serve slot request, no ResourceManager connected. Adding as
pending request [SlotRequestId{b4c6e7357e4620bf2e997c46d7723eb1}]
16:54:21,301 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
 - Cannot serve slot request, no ResourceManager connected. Adding as
pending request [SlotRequestId{841bbb79b01b5e0d9ae749a03f65c303}]
16:54:21,301 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
 - Cannot serve slot request, no ResourceManager connected. Adding as
pending request [SlotRequestId{496120465d541ea9fd2ffcec89e2ac3b}]
16:54:21,304 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Connecting to ResourceManager akka.tcp://
fl...@prod-bigd-dn11.net:43757/user/resourcemanager()
16:54:21,307 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Could not resolve ResourceManager address
akka.tcp://flink@prod-bigd-dn11:43757/user/resourcemanager, retrying in
1 ms: Could not connect to rpc endpoint under address akka.tcp://
fl...@prod-bigd-dn11.net:43757/user/resourcemanager..
16:54:31,322 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Could not resolve ResourceManager address
akka.tcp://flink@prod-bigd-dn11:43757/user/resourcemanager, retrying in
1 ms: Could not connect to rpc endpoint under address
akka.tcp://flink@prod-bigd-dn11:43757/user/resourcemanager..

What can cause following problems?
*Cannot serve slot request, no ResourceManager connected*
*Could not resolve ResourceManager address
akka.tcp://flink@prod-bigd-dn11:43757*

Regards,
Vitaliy


Re: Emit message at start and end of event time session window

2020-03-26 Thread Till Rohrmann
A quick update concerning your observations. The reason why you are seeing
the unordered output is because in the gist we used
a AssignerWithPeriodicWatermarks which generates watermarks periodically.
Due to this aspect, it can happen that Flink already process all elements
up to "20" before it sees the next watermark which triggers the processing.
If there are multiple windows being processed, Flink does not give a
guarantee in which order this happens.

You can avoid this behaviour if you used
an AssignerWithPunctuatedWatermarks instead. This watermark assigner is
called for every record. The updated gist [1] shows how it is used.

[1] https://gist.github.com/tillrohrmann/dda90b8b0e67e379a8dfee967fbd9af1

Cheers,
Till

On Thu, Mar 26, 2020 at 4:27 PM Till Rohrmann  wrote:

> Hmm, I might have given you a bad advice. I think the problem becomes
> harder because with Flink's window and trigger API we need to keep state
> consistent between the Trigger and the Window function. Maybe it would be
> easier to not rely on the windowing mechanism and instead to use Flink's
> process function [1] to implement the logic yourself.
>
> With the process function you have basically a low level API with which
> you can implement an operator which groups incoming events according to
> sessions and outputs the required information.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
> Cheers,
> Till
>
> On Thu, Mar 26, 2020 at 11:27 AM Manas Kale  wrote:
>
>> Hi Till,
>> I see, thanks for the clarification.
>> Assuming all other setting are the same, if I generate events as follows
>> :
>> Element.from("1", 1000L),
>> Element.from("2", 2000L),
>> Element.from("3", 3000L),
>> Element.from("10", 1L)
>> ,Element.from("11", 11000L),
>> Element.from("12", 12000L),
>> Element.from("20", 2L)
>> we will expect 2 session windows to be created {1,2,3} and {10,11,12}
>> with appropriate messages. However, when I run this, there seems to be a
>> problem in the valueState of MyWindowFunction. Apparently that state is
>> being shared by both the session windows, which leads to incorrect results.
>> To solve this, I replaced it with a MapState. The Long is
>> the start timestamp of a window, something that can uniquely identify
>> different windows. This works but with one caveat : if we have two
>> subsequent windows, the ordering of messages is :
>>
>> window1 started @ 1000 -> window2 started @ 1 -> window1 ended @ 8000
>> -> window2 ended @ 17000
>>
>> whereas I expect it to be :
>> window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @ 1
>> -> window2 ended @ 17000
>>
>> I thought Flink would execute event time timers and process events in
>> chronological event time order. However, it seems that the onEventTime()
>> invocation of window1 is called *after *elements from window2 have been
>> processed even though window1's onEventTime() is earlier in event time.
>>
>> Is my approach and reasoning correct? Also, is it possible to get the
>> messages in the expected order?
>>
>> Thanks!
>>
>>
>>
>>
>>
>> On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Manas,
>>>
>>> the problem is that the print() statement is being executed with a
>>> different parallelism than 1. Due to this fact, the messages coming from
>>> the window function will be sent in round-robin fashion to the print
>>> operators. If you remove the setParallelism(1) from the window function,
>>> then the window function will be executed with the same parallelism as the
>>> print operator. Due to this fact, there is no round-robin distribution of
>>> the events but every window function task will simply forward its
>>> elements to its print operator task. You should be able to see these
>>> topology differences in the web ui.
>>>
>>> You could configure the print() operator to run with a parallelism of 1
>>> as well by adding a setParallelism(1) statement to it.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Mar 26, 2020 at 7:11 AM Manas Kale 
>>> wrote:
>>>
 Hi Till,
 When I run the example code that you posted, the order of the three
 messages (window started, contents of window and window ended) is
 non-deterministic. This is surprising to me, as setParallelism(1) has been
 used in the pipeline - I assumed this should eliminate any form of race
 conditions for printing. What's more is that if I *remove*
 setParallelism(1) from the code, the output is deterministic and correct
 (i.e. windowStarted -> windowContents -> windowEnded).

 Clearly, something is wrong with my understanding. What is it?

 On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann 
 wrote:

> Great to hear that you solved the problem. Let us know if you run into
> any other issues.
>
> Cheers,
> Till
>
> On Fri, Feb 28, 2020 at 8:08 

Re: Emit message at start and end of event time session window

2020-03-26 Thread Till Rohrmann
Hmm, I might have given you a bad advice. I think the problem becomes
harder because with Flink's window and trigger API we need to keep state
consistent between the Trigger and the Window function. Maybe it would be
easier to not rely on the windowing mechanism and instead to use Flink's
process function [1] to implement the logic yourself.

With the process function you have basically a low level API with which you
can implement an operator which groups incoming events according to
sessions and outputs the required information.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

Cheers,
Till

On Thu, Mar 26, 2020 at 11:27 AM Manas Kale  wrote:

> Hi Till,
> I see, thanks for the clarification.
> Assuming all other setting are the same, if I generate events as follows :
> Element.from("1", 1000L),
> Element.from("2", 2000L),
> Element.from("3", 3000L),
> Element.from("10", 1L)
> ,Element.from("11", 11000L),
> Element.from("12", 12000L),
> Element.from("20", 2L)
> we will expect 2 session windows to be created {1,2,3} and {10,11,12} with
> appropriate messages. However, when I run this, there seems to be a problem
> in the valueState of MyWindowFunction. Apparently that state is being
> shared by both the session windows, which leads to incorrect results.
> To solve this, I replaced it with a MapState. The Long is
> the start timestamp of a window, something that can uniquely identify
> different windows. This works but with one caveat : if we have two
> subsequent windows, the ordering of messages is :
>
> window1 started @ 1000 -> window2 started @ 1 -> window1 ended @ 8000
> -> window2 ended @ 17000
>
> whereas I expect it to be :
> window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @ 1
> -> window2 ended @ 17000
>
> I thought Flink would execute event time timers and process events in
> chronological event time order. However, it seems that the onEventTime()
> invocation of window1 is called *after *elements from window2 have been
> processed even though window1's onEventTime() is earlier in event time.
>
> Is my approach and reasoning correct? Also, is it possible to get the
> messages in the expected order?
>
> Thanks!
>
>
>
>
>
> On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann 
> wrote:
>
>> Hi Manas,
>>
>> the problem is that the print() statement is being executed with a
>> different parallelism than 1. Due to this fact, the messages coming from
>> the window function will be sent in round-robin fashion to the print
>> operators. If you remove the setParallelism(1) from the window function,
>> then the window function will be executed with the same parallelism as the
>> print operator. Due to this fact, there is no round-robin distribution of
>> the events but every window function task will simply forward its
>> elements to its print operator task. You should be able to see these
>> topology differences in the web ui.
>>
>> You could configure the print() operator to run with a parallelism of 1
>> as well by adding a setParallelism(1) statement to it.
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 26, 2020 at 7:11 AM Manas Kale  wrote:
>>
>>> Hi Till,
>>> When I run the example code that you posted, the order of the three
>>> messages (window started, contents of window and window ended) is
>>> non-deterministic. This is surprising to me, as setParallelism(1) has been
>>> used in the pipeline - I assumed this should eliminate any form of race
>>> conditions for printing. What's more is that if I *remove*
>>> setParallelism(1) from the code, the output is deterministic and correct
>>> (i.e. windowStarted -> windowContents -> windowEnded).
>>>
>>> Clearly, something is wrong with my understanding. What is it?
>>>
>>> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann 
>>> wrote:
>>>
 Great to hear that you solved the problem. Let us know if you run into
 any other issues.

 Cheers,
 Till

 On Fri, Feb 28, 2020 at 8:08 AM Manas Kale 
 wrote:

> Hi,
> This problem is solved[1]. The issue was that the BroadcastStream did
> not contain any watermark, which prevented watermarks for any downstream
> operators from advancing.
> I appreciate all the help.
> [1]
> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>
> Thanks,
> Manas
>
> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale 
> wrote:
>
>> Hi Rafi and Till,
>> Thank you for pointing out that edge case, Rafi.
>>
>> Till, I am trying to get this example working with the BroadcastState
>> pattern upstream to the window operator[1]. The problem is that 
>> introducing
>> the BroadcastState makes the onEventTime() *never* fire. Is the
>> BroadcastState somehow eating up the watermark? Do I need to 

Re: Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 Thread LakeShen
统一对 Flink 项目源码进行编译打包,你会在 flink-dist 这个模块下面的 target 目录下面看到相关 Flink
命令行的一些东西,同时在lib 包下面,
会有一些 Flink Jar 包

Best wishes,
沈磊

godfrey he  于2020年3月26日周四 下午8:51写道:

> 目前 flink 支持 Scala 2.11 和 Scala 2.12, 默认情况下,通过 mvn package 打包出来的是包含 Scala
> 2.11 的包,例如  flink-table-blink_*2.11*-1.10.0.jar。
> 可以通过  -Dscala-2.12 指定 Scala 的版本是 2.12, 打出来的包是 flink-table-blink_*2.12*
> -1.10.0.jar  这样的。
>
> Best,
> Godfrey
>
> wangl...@geekplus.com.cn  于2020年3月26日周四
> 下午6:34写道:
>
> >
> > flink-table-uber-blink 下
> >  mvn clean install -DskipTests -Dscala-2.12 -DskipTests
> >
> > 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的
> >
> > 谢谢,
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> >
> > Sender: Kurt Young
> > Send Time: 2020-03-26 18:15
> > Receiver: user-zh
> > cc: jihongchao
> > Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的
> > flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
> > wangl...@geekplus.com.cn> wrote:
> >
> > >
> > > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> > > 这个 jar 是从哪里 build 出来的呢?
> > >
> > > 我 clone github 上的源代码,mvn clean package
> > > 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> > > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
> > >  flink-table-blink_2.12-1.10.0.jar  是对应的
> > > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
> > >
> > > 谢谢,
> > > 王磊
> > >
> > >
> > >
> > > wangl...@geekplus.com.cn
> > >
> > >
> >
>


Flink 1.10: 关于 RocksDBStateBackend In background 状态清理机制的问题

2020-03-26 Thread LakeShen
Hi 社区的小伙伴,

我现在有个关于 Flink 1.10 RocksDBStateBackend 状态清理机制的问题,在 1.10中,RocksDB 默认使用 in
background 方式进行状态清理,使用 compaction filter 方式。正如官方文档所说:

> RocksDB compaction filter will query current timestamp, used to check
> expiration, from Flink every time after processing certain number of state
> entries.


现在有个疑问,RocksDB 在处理一定数量的 State Entrys 就会进行 compaction filter,那么这个 compaction
filter 是针对这一定数量 State Entrys ,然后检查他们是否过期吗?
还是说,会针对一个 Task 当前所有的状态文件,统一进行 Compaction filter,在合并时,检查每个 entry,过期的状态 Key
就过滤删除掉。

这个地方我没有弄明白,非常期待你的回复。

Best wishes,
沈磊


Re: Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 Thread godfrey he
目前 flink 支持 Scala 2.11 和 Scala 2.12, 默认情况下,通过 mvn package 打包出来的是包含 Scala
2.11 的包,例如  flink-table-blink_*2.11*-1.10.0.jar。
可以通过  -Dscala-2.12 指定 Scala 的版本是 2.12, 打出来的包是 flink-table-blink_*2.12*
-1.10.0.jar  这样的。

Best,
Godfrey

wangl...@geekplus.com.cn  于2020年3月26日周四 下午6:34写道:

>
> flink-table-uber-blink 下
>  mvn clean install -DskipTests -Dscala-2.12 -DskipTests
>
> 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
> Sender: Kurt Young
> Send Time: 2020-03-26 18:15
> Receiver: user-zh
> cc: jihongchao
> Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的
> flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)
>
> Best,
> Kurt
>
>
> On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
> >
> > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> > 这个 jar 是从哪里 build 出来的呢?
> >
> > 我 clone github 上的源代码,mvn clean package
> > 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
> >  flink-table-blink_2.12-1.10.0.jar  是对应的
> > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
> >
> > 谢谢,
> > 王磊
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
>


Re: savepoint - checkpoint - directory

2020-03-26 Thread Yun Tang
Hi Fanbin

To resume from checkpoint, you should provide at least the directory named as 
/path/chk-x or /path/chk-x/_metadata. The sub-dir named as “shared” is used to 
store incremental  checkpoint content. You could refer to [1] for more 
information.

BTW, stop with savepoint could help reduce source rewind time.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure

获取 Outlook for Android


From: Fanbin Bu 
Sent: Thursday, March 26, 2020 2:53:29 AM
To: user 
Subject: savepoint - checkpoint - directory

Hi,

For savepoint, the dir looks like
s3://bucket/savepoint-jobid/*

To resume, i do:
flink run -s s3://bucket/savepoint-jobid/
perfect!


For checkpoint, the dir looks like
s3://bucket/jobid/chk-100
s3://bucket/jobid/shared.   <-- what is this for?

To resume, which one should i do:
flink run -s s3://bucket/jobid
or
flink run -s s3://bucket/jobid/chk-100


Another question, I saw that `flink cancel` is deprecated and recommend to use 
`flink stop`. But isn't this causing production down time? In order to avoid 
down time, is it recommended to just do `flink savepoint`?

Thanks,
Fanbin


Flink Weekly | 每周社区动态更新 - 2020/03/26

2020-03-26 Thread forideal
大家好,本文为 Flink Weekly 的第十期,由张成整理,主要内容包括:近期社区开发进展,邮件问题答疑以及社区直播和相关技术博客。
社区开发进展 

[release] 关于发布 Flink 1.10.1 的讨论正在火热进行,最新消息请参考 Yu Li 发起的讨论。

[1]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-td38689.html




[Checkpoint] Arvid Heise 发起 FLIP-76 的投票已经通过。FLIP-76 提出了一种基于检查点屏障的非阻塞对齐执行检查点的方法。

相关好处有:

即使某些 Operator 仍在等待正在输入通道上的检查点屏障,上游仍可以继续产生数据。

即使对于具有单个输入通道的 Operator,在整个执行图中的检查点次数也大大减少。

即使在不稳定的环境中,最终用户也将看到更多的进展,因为更及时的检查点将避免过多的重复计算。

促进更快地 rescaling。

更多信息参考:

[2]https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

[3]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-76-Unaligned-checkpoints-td33651.html




[Connectors/Filesystem] 删除 BucketingSink。BucketingSink 已经在 Flink 1.9 
版本标记为过期。Flink 有一个新的 StreamingFileSink 替代 BucketingSink。目前 StreamingFileSink 的 
scala 版本存在 bug。

[4]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/jira-Created-FLINK-16616-Drop-BucketingSink-td38950.html

[5]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-Bucketing-Sink-td38830.html#a38831

[6]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/jira-Created-FLINK-16684-StreamingFileSink-builder-does-not-work-with-Scala-td39109.html




[Table API & SQL] Jingsong Li 发起了引入 StatefulSequenceSource 的讨论。这个能够方便用户更好的进行测试 
SQL。最终讨论决定在 Table 支持 DataGenerator 的 source、Print 的 sink 和blackhole 的 sink。

[7]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-TableFactory-for-StatefulSequenceSource-td39116.html




[sql] Timo 分享了一个关于新的 TableSource 和 TableSink 
接口的提案(FLIP-95)。Jark、Dawid、Aljoscha、Kurt、Jingsong 
等参考了讨论。其目标是简化当前的接口架构,以支持变更日志源(FLIP-105)和删除对 DataStream API 和 planner 的依赖。

[8]https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces




[hadoop]跟进 Stephan 和 Till 的讨论。Sivaprasanna 分享了 Hadoop 
相关实用程序组件的概述,以开始讨论将其移动到单独的模块中 “flink-hadoop-utils”。

[9]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SerializableHadoopConfiguration-td38371.html

[10]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-a-new-module-flink-hadoop-utils-td39107.html




用户问题

叶贤勋在使用 Hive Source 的时候遇到了 Kerberos 认证的问题,社区同学进行了相关的讨论和建议,感兴趣的同学可以参考如下链接:

[11]http://apache-flink.147419.n8.nabble.com/Hive-Source-With-Kerberos-td1688.html




hiliuxg 在社区提问 Flink SQL 如何支持每隔 5 分钟触发当日零点到当前 5 分钟的聚合计算。Jark Wu 和 Tianwang Li 
进行了相关解答。

[12]http://apache-flink.147419.n8.nabble.com/flink-sql-5-5-td2011.html




hiliuxg 在社区提问 Flink SQL COUNT DISTINCT 性能优化。Benchao Li、田志声、Lucas Wu、Lake Shen 
展开了一些讨论,有兴趣的同学可以参考如下链接:

[13]http://apache-flink.147419.n8.nabble.com/flink-sql-td2012.html




王志华 在社区提问 Flink DDL 如何支持自定义 Source/Sink 表。社区同学在邮件中进行了详细的回答。

[14]http://apache-flink.147419.n8.nabble.com/ddl-td1959.html




111 在社区提问 Flink SQL1.10 大表 join 如何优化?Jark Wu、Kurt Young 和 Jingsong Lee 
进行了详细的解答。目前 Flink SQL 的并行度(非 Source 
)并不是自动推断出来的,需要通过设置table.exec.resource.default-parallelism,详细的内容参考:

[15]http://apache-flink.147419.n8.nabble.com/Flink-SQL1-10-join-td2044.html

[16]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-How-can-i-set-parallelism-in-clause-of-group-by-td33736.html




Aaron Levin 在社区提问 如何能够做到修改任务的并发,然后从 checkpoint 启动任务。Piotr Nowojski、Till 
Rohrmann 参与了相关讨论。内容涉及到 unaligned checkpoints (FLIP-76) 对savepoint 和 checkpoint 
的影响。同时 Lake Shen 也提出了类似的问题。有兴趣的同学可以参考

[17]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expected-behaviour-when-changing-operator-parallelism-but-starting-from-an-incremental-checkpoint-td33608.html

[18]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cancel-the-flink-task-and-restore-from-checkpoint-can-I-change-the-flink-operator-s-parallelism-td33613.html




Jiawei Wu 在社区提问“如何使用 Flink SQL 计算 按照供应商分组同时入库时间大于 15 天的库存数据?”,有兴趣的同学可以参考:

[19]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-flink-to-calculate-sum-of-the-inventory-under-certain-conditions-td33323.html




Vinod Mehra 在社区提出了一个关于 Join 相关的问题。这个问题比较复杂,Timo Walther 进行了相关解答。里面涉及到了一些如何进行 
Flink SQL 问题的排查。有兴趣的同学可以参考:

[20]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/time-windowed-joins-and-tumbling-windows-td33551.html

活动博客文章及其他




SQL 开发任务超 50% !滴滴实时计算的演进与优化

[21]https://ververica.cn/corporate_practice/evolution-and-optimization-of-didi-real-time-computing/




Flink 生态:一个案例快速上手 PyFlink

[22]https://ververica.cn/developers/pyflink-a-case-in-hand/




一套 SQL 
搞定数据仓库?Flink有了新尝试[23]https://ververica.cn/developers/a-set-of-sql-to-handle-data-warehouse/




如何在 Flink 中规划 RocksDB 内存容量?

[24]https://ververica.cn/developers/how-to-plan-the-memory-capacity-of-rocksdb-in-flink/





Re: Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 Thread wangl...@geekplus.com.cn

flink-table-uber-blink 下
 mvn clean install -DskipTests -Dscala-2.12 -DskipTests

不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的

谢谢, 
王磊


wangl...@geekplus.com.cn 
 
Sender: Kurt Young
Send Time: 2020-03-26 18:15
Receiver: user-zh
cc: jihongchao
Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的
flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)
 
Best,
Kurt
 
 
On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
>
> 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> 这个 jar 是从哪里 build 出来的呢?
>
> 我 clone github 上的源代码,mvn clean package
> 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
>  flink-table-blink_2.12-1.10.0.jar  是对应的
> 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>


Re: Emit message at start and end of event time session window

2020-03-26 Thread Manas Kale
Hi Till,
I see, thanks for the clarification.
Assuming all other setting are the same, if I generate events as follows :
Element.from("1", 1000L),
Element.from("2", 2000L),
Element.from("3", 3000L),
Element.from("10", 1L)
,Element.from("11", 11000L),
Element.from("12", 12000L),
Element.from("20", 2L)
we will expect 2 session windows to be created {1,2,3} and {10,11,12} with
appropriate messages. However, when I run this, there seems to be a problem
in the valueState of MyWindowFunction. Apparently that state is being
shared by both the session windows, which leads to incorrect results.
To solve this, I replaced it with a MapState. The Long is
the start timestamp of a window, something that can uniquely identify
different windows. This works but with one caveat : if we have two
subsequent windows, the ordering of messages is :

window1 started @ 1000 -> window2 started @ 1 -> window1 ended @ 8000
-> window2 ended @ 17000

whereas I expect it to be :
window1 started @ 1000 -> window1 ended @ 8000 -> window2 started @ 1
-> window2 ended @ 17000

I thought Flink would execute event time timers and process events in
chronological event time order. However, it seems that the onEventTime()
invocation of window1 is called *after *elements from window2 have been
processed even though window1's onEventTime() is earlier in event time.

Is my approach and reasoning correct? Also, is it possible to get the
messages in the expected order?

Thanks!





On Thu, Mar 26, 2020 at 2:55 PM Till Rohrmann  wrote:

> Hi Manas,
>
> the problem is that the print() statement is being executed with a
> different parallelism than 1. Due to this fact, the messages coming from
> the window function will be sent in round-robin fashion to the print
> operators. If you remove the setParallelism(1) from the window function,
> then the window function will be executed with the same parallelism as the
> print operator. Due to this fact, there is no round-robin distribution of
> the events but every window function task will simply forward its
> elements to its print operator task. You should be able to see these
> topology differences in the web ui.
>
> You could configure the print() operator to run with a parallelism of 1 as
> well by adding a setParallelism(1) statement to it.
>
> Cheers,
> Till
>
> On Thu, Mar 26, 2020 at 7:11 AM Manas Kale  wrote:
>
>> Hi Till,
>> When I run the example code that you posted, the order of the three
>> messages (window started, contents of window and window ended) is
>> non-deterministic. This is surprising to me, as setParallelism(1) has been
>> used in the pipeline - I assumed this should eliminate any form of race
>> conditions for printing. What's more is that if I *remove*
>> setParallelism(1) from the code, the output is deterministic and correct
>> (i.e. windowStarted -> windowContents -> windowEnded).
>>
>> Clearly, something is wrong with my understanding. What is it?
>>
>> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann 
>> wrote:
>>
>>> Great to hear that you solved the problem. Let us know if you run into
>>> any other issues.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale 
>>> wrote:
>>>
 Hi,
 This problem is solved[1]. The issue was that the BroadcastStream did
 not contain any watermark, which prevented watermarks for any downstream
 operators from advancing.
 I appreciate all the help.
 [1]
 https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern

 Thanks,
 Manas

 On Thu, Feb 27, 2020 at 4:28 PM Manas Kale 
 wrote:

> Hi Rafi and Till,
> Thank you for pointing out that edge case, Rafi.
>
> Till, I am trying to get this example working with the BroadcastState
> pattern upstream to the window operator[1]. The problem is that 
> introducing
> the BroadcastState makes the onEventTime() *never* fire. Is the
> BroadcastState somehow eating up the watermark? Do I need to generate the
> watermark again in the KeyedBroadcastProcessFunction?
>
> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>
> Thanks,
> Manas
>
> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann 
> wrote:
>
>> Hi Manas and Rafi,
>>
>> you are right that when using merging windows as event time session
>> windows are, then Flink requires that any state the Trigger keeps is of
>> type MergingState. This constraint allows that the state can be merged
>> whenever two windows get merged.
>>
>> Rafi, you are right. With the current implementation it might happen
>> that you send a wrong started window message. I think it depends on the
>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>> your watermark. If you want to be 

Re: 回复: Flink JDBC Driver是否支持创建流数据表

2020-03-26 Thread godfrey he
还有一种方式是sql gateway 支持 --jar 和 --library 指定用户的jar,这种方式不需要用户将jar放到flink的lib下

godfrey he  于2020年3月25日周三 下午6:24写道:

> hi 赵峰,
>
> 你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink
> JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。
>
> Best,
> Godfrey
>
> Zhenghua Gao  于2020年3月25日周三 下午4:26写道:
>
>> 请确认一下 kafka connector 的jar包是否在 flink/lib 下。
>> 目前的报错看起来是找不到kafka connector的jar包。
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Wed, Mar 25, 2020 at 4:18 PM 赵峰  wrote:
>>
>> > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
>> >
>> >
>> > 
>> >
>> > 参考下这个文档:
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> > 下面的语法应该是不支持的:
>> >   'format.type' = 'csv',\n" +
>> > "'format.field-delimiter' = '|'\n"
>> >
>> > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
>> > tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
>> > + "order_no VARCHAR,\n"
>> > + "status INT\n"
>> > + ") WITH (\n"
>> > + "'connector.type' = 'kafka',\n"
>> > + "'connector.version' = 'universal',\n"
>> > + "'connector.topic' = 'wanglei_test',\n"
>> > + "'connector.startup-mode' = 'latest-offset',\n"
>> > + "'connector.properties.0.key' = 'zookeeper.connect',\n"
>> > + "'connector.properties.0.value' = 'xxx:2181',\n"
>> > + "'connector.properties.1.key' = 'bootstrap.servers',\n"
>> > + "'connector.properties.1.value' = 'xxx:9092',\n"
>> > + "'update-mode' = 'append',\n"
>> > + "'format.type' = 'json',\n"
>> > + "'format.derive-schema' = 'true'\n"
>> > + ")");
>> >
>> > 王磊
>> >
>> >
>> > wangl...@geekplus.com.cn
>> > 发件人: 赵峰
>> > 发送时间: 2020-03-24 21:28
>> > 收件人: user-zh
>> > 主题: Flink JDBC Driver是否支持创建流数据表
>> > hi
>> >
>> > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
>> > Connection connection =
>> >
>> DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
>> > Statement statement = connection.createStatement();
>> > statement.executeUpdate(
>> > "CREATE TABLE table_kafka (\n" +
>> > "user_id BIGINT,\n" +
>> > "item_id BIGINT,\n" +
>> > "category_id BIGINT,\n" +
>> > "behavior STRING,\n" +
>> > "ts TIMESTAMP(3),\n" +
>> > "proctime as PROCTIME(),\n" +
>> > "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
>> > ") WITH (\n" +
>> > "'connector.type' = 'kafka', \n" +
>> > "'connector.version' = 'universal', \n" +
>> > "'connector.topic' = 'flink_im02', \n" +
>> > "'connector.properties.group.id' = 'flink_im02_new',\n" +
>> > "'connector.startup-mode' = 'earliest-offset', \n" +
>> > "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
>> > "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
>> > "'format.type' = 'csv',\n" +
>> > "'format.field-delimiter' = '|'\n" +
>> > ")");
>> > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
>> > while (rs1.next()) {
>> > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
>> > }
>> > statement.close();
>> > connection.close();
>> > 报错:
>> > Reason: Required context properties mismatch.
>> > The matching candidates:
>> > org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> > Mismatched properties:
>> > 'connector.type' expects 'filesystem', but is 'kafka'
>> > 赵峰
>> >
>> > 
>> > Quoted from:
>> >
>> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html
>> >
>> >
>> >
>> >
>> > 赵峰
>>
>


Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 Thread Kurt Young
flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)

Best,
Kurt


On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> 这个 jar 是从哪里 build 出来的呢?
>
> 我 clone github 上的源代码,mvn clean package
> 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
>  flink-table-blink_2.12-1.10.0.jar  是对应的
> 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>


Re: [udf questions]

2020-03-26 Thread jincheng sun
比较明显的一个问题是UDF定义有些问题是你的sink表定义了一个STRING类型的字段,但是你SQL里面是SELECT了udf的结果,所以我想你的UDF应该也返回一个STRING,也就是result_type=DataTypes.STRING(),同时确保你udf真的返回的是一个字符串,不是json的Object。

Best,
Jincheng


WuPangang  于2020年3月26日周四 下午5:24写道:

> Data as below:
>  
> {"host":"172.25.69.145","@timestamp":"2020-03-23T09:21:16.315Z","@version":"1","offset":532178261,"logmark":"advertise-api","type":"squirrel","path":"/home/logs/app/action_log/advertise_api/AdStatistic.2020-03-23.log","message":"{\"date_time\":\"2020-03-23
> 17:21:15\",\"serverTime\":1584955275,\"currentTime\":\"1584955274\",\"distinct_id\":\"734232168\",\"device_id\":\"ad2c15b7cf910cc6\",\"event\":\"view_material\",\"click_pos\":\"ad_view_avatar\",\"material_name\":\"\\u5934\\u50cf\\u66dd\\u5149\",\"phase\":\"prod\",\"source\":\"phone\",\"client_v\":\"2.100\",\"platform\":\"android\",\"ip\":\"39.84.23.81\",\"network\":\"WIFI\",\"idfa\":\"867179032526091\",\"unique_device_id\":\"ad2c15b7cf910cc6\",\"manufacturer\":\"HUAWEI\",\"model\":\"PRA-AL00X\",\"carrier\":\"\",\"local_dns\":\"0.0,0.0\",\"user_id\":\"734232168\",\"is_login\":1,\"url_path\":\"http:\\/\\/
> down-ddz.734399.com\\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"country\":\"\",\"province\":\"\",\"city\":\"\",\"setup_source\":\"androiddefault\",\"seller_id\":\"139091\",\"plan_id\":\"380141\",\"plan_name\":\"HL-10-12.27-\\u5927\\u80f8-\\u6e38\\u620f\",\"put_platform_id\":\"25\",\"put_ad_type_id\":\"27\",\"put_position_id\":\"0\",\"put_sell_type_id\":\"0\",\"material_id\":\"378120\",\"show_id\":\"SMALL_VIDEO_6948314\",\"put_start_time\":\"1577808000\",\"plan_params\":\"advertise-380141-378120\",\"download_app_name\":\"\\u7231\\u73a9\\u6597\\u5730\\u4e3b\",\"ad_type\":\"0\",\"put_source\":\"1\",\"is_ad_recommend\":\"\",\"third_video_url\":\"\",\"third_img_url\":\"\",\"ua\":\"JuMei\\/
> (PRA-AL00X; Android; Android OS ; 8.0.0; zh)
> ApacheHttpClient\\/4.0\",\"platform_v\":\"2.100\",\"played_time\":\"\",\"video_time\":\"\",\"status\":1,\"target_link\":\"http:\\/\\/
> down-ddz.734399.com
> \\/download\\/shuabao\\/shuabao10-awddz-release-hl.apk\",\"ad_material_title\":\"\\u5de5\\u8d44\\u6ca13W\\uff0c\\u5c31\\u73a9\\u8fd9\\u6597\\u5730\\u4e3b\\uff0c\\u6bcf\\u5929\\u90fd\\u80fd\\u9886\\u7ea2\\u5305~\",\"ad_material_desc\":\"\\u73a9\\u6597\\u5730\\u4e3b\\uff0c\\u7ea2\\u5305\\u79d2\\u4f53\\u73b0\",\"icon_url\":\"http:\\/\\/
> p12.jmstatic.com
> \\/adv\\/\\/material\\/20190920\\/jmadv5d849b203750f.png\",\"button_text\":\"\\u7acb\\u5373\\u4e0b\\u8f7d\",\"material_type\":\"video\",\"third_app_id\":\"\",\"third_pos_id\":\"\",\"download_apk\":\"\",\"package_name\":\"\",\"h5_url\":\"\",\"message\":\"\",\"unit_price\":\"4.15\",\"balance_type\":\"3\",\"ecpm\":\"15.189\",\"accumulate_ctr\":\"0.366\",\"pre_ctr\":\"0.366\",\"real_unit_price\":\"0\",\"pre_charge\":\"0\",\"pre_change\":\"0\",\"subsidy_type\":\"\",\"subsidy_ratio\":\"\",\"suppress_type\":\"0\",\"suppress_ratio\":\"0\",\"two_ecpm\":\"15.165\",\"real_ecpm\":\"0\",\"real_unit_price_threshold\":\"0\",\"plan_accumulate_pv\":\"0\",\"plan_accumulate_cpv\":\"0\",\"plan_accumulate_change\":\"0\",\"request_id\":\"ad2c15b7cf910cc6-1584955121.768552967\",\"ad_activity_type\":\"0\",\"one_cost\":\"4.15\",\"bid\":\"0.366\",\"real_spr\":\"0.998419\",\"one_ecpm\":\"15.189\",\"es_one\":\"0,16,\",\"es_two\":\"0\",\"es_three\":\"0\",\"es_four\":\"0\",\"age\":\"1020\",\"sex\":\"0\",\"one_cost_expend\":\"4.15\",\"two_cost_expend\":\"4.153438\",\"reward_source\":\"\",\"provide\":\"\"}","topicid":"advertise_module","project":"advertise-api”}
> Problem:
> 数据是个嵌套json,并且核心字段message的格式不能直接通过table api json 相关的方法来处理。
> 自己思考的解决思路:通过udf, 使用json.loads来处理。
> 实际使用中遇到的问题: job提交之后,在dashboard上发现bytes received,records recevied
> 都是0;下游是同步给Kafka,去消费下游kafka也没有数据。
>
> Code as below:
> from pyflink.datastream import
> StreamExecutionEnvironment,CheckpointingMode,TimeCharacteristic
> from pyflink.table import StreamTableEnvironment,
> EnvironmentSettings,TableSink,TableConfig,DataTypes
> from pyflink.table.descriptors import
> Schema,Kafka,Rowtime,ConnectTableDescriptor,CustomConnectorDescriptor,Json
> from pyflink.common import RestartStrategies
> from pyflink.table.udf import udf
> import json
>
> env = StreamExecutionEnvironment.get_execution_environment()
> #env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> ##checkpoint设置
> #env.enable_checkpointing(30)
>
> #env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode().EXACTLY_ONCE)
> #env.get_checkpoint_config().set_min_pause_between_checkpoints(3)
> #env.get_checkpoint_config().set_checkpoint_timeout(6)
> #env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
> #env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(False)
> ##contain设置
> env.set_parallelism(12)
> env.set_restart_strategy(RestartStrategies.fixed_delay_restart(50,6))
> ##使用blink api
> environment_settings =
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> table_env =
> 

flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 Thread wangl...@geekplus.com.cn

单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar 
这个 jar 是从哪里 build 出来的呢?
 
我 clone github 上的源代码,mvn clean package
我以为 flink-table/flink-table-planner-blink 目录下build 出的 
flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的   
flink-table-blink_2.12-1.10.0.jar  是对应的
我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。

谢谢,
王磊



wangl...@geekplus.com.cn 



Re: Emit message at start and end of event time session window

2020-03-26 Thread Till Rohrmann
Hi Manas,

the problem is that the print() statement is being executed with a
different parallelism than 1. Due to this fact, the messages coming from
the window function will be sent in round-robin fashion to the print
operators. If you remove the setParallelism(1) from the window function,
then the window function will be executed with the same parallelism as the
print operator. Due to this fact, there is no round-robin distribution of
the events but every window function task will simply forward its
elements to its print operator task. You should be able to see these
topology differences in the web ui.

You could configure the print() operator to run with a parallelism of 1 as
well by adding a setParallelism(1) statement to it.

Cheers,
Till

On Thu, Mar 26, 2020 at 7:11 AM Manas Kale  wrote:

> Hi Till,
> When I run the example code that you posted, the order of the three
> messages (window started, contents of window and window ended) is
> non-deterministic. This is surprising to me, as setParallelism(1) has been
> used in the pipeline - I assumed this should eliminate any form of race
> conditions for printing. What's more is that if I *remove*
> setParallelism(1) from the code, the output is deterministic and correct
> (i.e. windowStarted -> windowContents -> windowEnded).
>
> Clearly, something is wrong with my understanding. What is it?
>
> On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann 
> wrote:
>
>> Great to hear that you solved the problem. Let us know if you run into
>> any other issues.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale  wrote:
>>
>>> Hi,
>>> This problem is solved[1]. The issue was that the BroadcastStream did
>>> not contain any watermark, which prevented watermarks for any downstream
>>> operators from advancing.
>>> I appreciate all the help.
>>> [1]
>>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>>
>>> Thanks,
>>> Manas
>>>
>>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale 
>>> wrote:
>>>
 Hi Rafi and Till,
 Thank you for pointing out that edge case, Rafi.

 Till, I am trying to get this example working with the BroadcastState
 pattern upstream to the window operator[1]. The problem is that introducing
 the BroadcastState makes the onEventTime() *never* fire. Is the
 BroadcastState somehow eating up the watermark? Do I need to generate the
 watermark again in the KeyedBroadcastProcessFunction?

 [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49

 Thanks,
 Manas

 On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann 
 wrote:

> Hi Manas and Rafi,
>
> you are right that when using merging windows as event time session
> windows are, then Flink requires that any state the Trigger keeps is of
> type MergingState. This constraint allows that the state can be merged
> whenever two windows get merged.
>
> Rafi, you are right. With the current implementation it might happen
> that you send a wrong started window message. I think it depends on the
> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
> your watermark. If you want to be on the safe side, then I would recommend
> to use the ProcessFunction to implement the required logic. The
> ProcessFunction [1] is Flink's low level API and gives you access to state
> and timers. In it, you would need to buffer the elements and to sessionize
> them yourself, though. However, it would give you access to the
> watermark which in turn would allow you to properly handle your described
> edge case.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
> Cheers,
> Till
>
> Cheers,
> Till
>
> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch 
> wrote:
>
>> I think one "edge" case which is not handled would be that the first
>> event (by event-time) arrives late, then a wrong "started-window" would 
>> be
>> reported.
>>
>> Rafi
>>
>>
>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale 
>> wrote:
>>
>>> Is the reason ValueState cannot be use because session windows are
>>> always formed by merging proto-windows of single elements, therefore a
>>> state store is needed that can handle merging. ValueState does not 
>>> provide
>>> this functionality, but a ReducingState does?
>>>
>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale 
>>> wrote:
>>>
 Hi Till,
 Thanks for your answer! You also answered the next question that I
 was about to ask "Can we share state between a Trigger and a Window?"
 Currently the only (convoluted) way to share state between two 
 operators is
 through the broadcast state pattern, right?
 Also, in your example, why can't we use 

Re: How to consume kafka from the last offset?

2020-03-26 Thread Jim Chen
Hi,
I am so sorry. It's not auto.offset.reset. Correctly, it is
*enable.auto.commit=false*

Best Wishs!

Dominik Wosiński  于2020年3月26日周四 下午4:20写道:

> Hey,
> Are You completely sure you mean *auto.offset.reset ??  *False is not
> valid setting for that AFAIK.
>
> Best,
> Dom.
>
> czw., 26 mar 2020 o 08:38 Jim Chen 
> napisał(a):
>
>> Thanks!
>>
>> I made a mistake. I forget to set the auto.offset.reset=false. It's my
>> fault.
>>
>> Dominik Wosiński  于2020年3月25日周三 下午6:49写道:
>>
>>> Hi Jim,
>>> Well, *auto.offset.reset *is only used when there is no offset saved
>>> for this *group.id * in Kafka. So, if You want to read
>>> the data from the latest record (and by latest I mean the newest here) You
>>> should assign the *group.id * that was not previously
>>> used and then FlinkKafkaConsumer should automatically fetch the last offset
>>> and start reading from that place.
>>>
>>>
>>> Best Regards,
>>> Dom.
>>>
>>> śr., 25 mar 2020 o 11:19 Jim Chen 
>>> napisał(a):
>>>
 Hi, All
   I use flink-connector-kafka-0.11 consume the Kafka0.11. In
 KafkaConsumer params, i set the group.id and auto.offset.reset. In the
 Flink1.10, set the kafkaConsumer.setStartFromGroupOffsets();
   Then, i restart the application, found the offset is not from the
 last position. Any one know where is wrong? HELP!

>>>


Re: How to consume kafka from the last offset?

2020-03-26 Thread Dominik Wosiński
Hey,
Are You completely sure you mean *auto.offset.reset ??  *False is not valid
setting for that AFAIK.

Best,
Dom.

czw., 26 mar 2020 o 08:38 Jim Chen  napisał(a):

> Thanks!
>
> I made a mistake. I forget to set the auto.offset.reset=false. It's my
> fault.
>
> Dominik Wosiński  于2020年3月25日周三 下午6:49写道:
>
>> Hi Jim,
>> Well, *auto.offset.reset *is only used when there is no offset saved for
>> this *group.id * in Kafka. So, if You want to read the
>> data from the latest record (and by latest I mean the newest here) You
>> should assign the *group.id * that was not previously
>> used and then FlinkKafkaConsumer should automatically fetch the last offset
>> and start reading from that place.
>>
>>
>> Best Regards,
>> Dom.
>>
>> śr., 25 mar 2020 o 11:19 Jim Chen 
>> napisał(a):
>>
>>> Hi, All
>>>   I use flink-connector-kafka-0.11 consume the Kafka0.11. In
>>> KafkaConsumer params, i set the group.id and auto.offset.reset. In the
>>> Flink1.10, set the kafkaConsumer.setStartFromGroupOffsets();
>>>   Then, i restart the application, found the offset is not from the last
>>> position. Any one know where is wrong? HELP!
>>>
>>


Re: flink1.10 & pyflink相关问题咨询

2020-03-26 Thread jincheng sun
看你错误日志运行的示例使用了PyUDFDemoConnector,也就是参考的博客[1],
在写这个博客时候1.10还没有发布,在发布之后接口有变化,所以PyUDFDemoConnector有个问题,我前两天进行了更新。你可以更新一下JAR。

另外你发的问题很久之前,发布1.10之前已经fix了[2],所以你更新一下connector在测试一下看看。有问题继续沟通。

Best,
Jincheng
[1]
https://enjoyment.cool/2019/12/05/Apache-Flink-%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-%E5%A6%82%E4%BD%95%E5%9C%A8PyFlink-1-10%E4%B8%AD%E8%87%AA%E5%AE%9A%E4%B9%89Python-UDF/
[2] https://issues.apache.org/jira/browse/FLINK-14581


zilong xiao  于2020年3月25日周三 下午12:19写道:

> 是的,有一个关键步骤:`source
> py36/bin/activate`是在文档中未体现的,执行该步骤后提交到yarn集群可以正常工作,然后最近在进一步研究1.10对于udf的支持,在尝试提交udf作业时,会出现如下异常:
>
> Caused by: java.io.IOException: Cannot run program
> "xxx/pyflink-udf-runner.sh": error=2, No such file or directory
>
> 提交作业前的操作如下:
> 1.pip install virtualenv
> 2.virtualenv --always-copy venv
> 3.venv/bin/pip install apache-beam==2.15.0
> 4.venv/bin/pip install apache-flink
> 5.venv/bin/pip install pydemo.tar.gz
> 6.zip -r venv.zip venv
> 7.bin/flink run -pyarch venv.zip -pyexec venv.zip/venv/bin/python -py
> ./word_count_socket.py -j pydemo.jar
>
> 不知道前辈是否有遇到过类似情况呢?
>
> 完整异常栈信息 & 作业见附件
>
> jincheng sun  于2020年3月19日周四 下午12:08写道:
>
>> 开心看到你在使用PyFlink 1.10,您遇到的问题,核心问题和将解决方式如下:
>>
>> 1.
>> 利用shell的alias功能更改python命令指向是无效的,因为flink不通过shell启动Python进程。所以对flink来说本地python环境依然是python2.
>> 2. 可以通过virtualenv, conda等工具创建python3.5+的环境,并激活,在激活了的环境下提交python job。 比如:
>>   pip install virtualenv
>>   virtualenv --python /usr/local/bin/python3 py36
>>   source py36/bin/activate
>>   flink run -py pyflink.py
>> 3. 另外也可以修改python命令的软链接,令其指向python3.5+。
>>
>> 你可以尝试一下,有问题随时邮件交流!
>>
>> Best,
>> 孙金城(金竹)
>>
>>
>>
>> zilong xiao  于2020年3月18日周三 下午12:14写道:
>>
>>> hi,金竹前辈您好,我是一名从事实时计算方向的IT工作者,最近在使用flink1.10 &
>>> pyflink时遇到一点问题,希望能加下您的钉钉或者其他联系方式和您进一步交流,问题大概描述如下:
>>>
>>> 任务提交环境:
>>> Apache-beam:2.15.0
>>> 本地python:2.7(已配置python3.7,通过修改~/.zshrc,alias
>>> python='/usr/local/bin/python3.7')
>>> pip:20.0.2
>>> flink:1.10
>>>
>>> 提交命令:bin/flink run -pyarch tmp/venv.zip -pyexec
>>> tmp/venv.zip/venv/bin/python3 -py word_count.py
>>>
>>> 在本地尝试以pre-job模式部署作业时,发现会提示如下报错,导致任务提交失败
>>>
>>> RuntimeError: Python versions prior to 3.5 are not supported for PyFlink
>>> [sys.version_info(major=2, minor=7, micro=16, releaselevel='final',
>>> serial=0)].
>>>
>>>
>>> 显而易见,正如flink官方文档所说flink1.10作业必须要求python3.5+,我通过-pyarch
>>> -pyexec来指定任务执行环境以及解释器环境,发现这两个指令貌似没生效,或者说没有作用,还是会有如上异常,具体执行过程都是参考您的文档:
>>> https://enjoyment.cool/2020/01/02/Apache-Flink-%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-PyFlink-%E4%BD%9C%E4%B8%9A%E7%9A%84%E5%A4%9A%E7%A7%8D%E9%83%A8%E7%BD%B2%E6%A8%A1%E5%BC%8F/#more
>>> 来操作的,我在想可能还是我的打开方式不对,亦或该指令还存在隐藏问题?可是网上也没有太多的资料,所以希望能和前辈您交流交流,帮我解开这个疑惑,期待前辈您的回复。
>>>
>>


Re: When i use the Tumbling Windows, find lost some record

2020-03-26 Thread Dawid Wysakowicz
Hi,

Can you share more details what do you mean that you loose some records?
Can you share what data are you ingesting what are the expected results
and what are the actual results you are getting. Without that it's
impossible to help you. So far your code looks rather correct.

Best,

Dawid

On 26/03/2020 08:52, Jim Chen wrote:
> Hi, All
>
>   When i use the Tumbling Windows, find lost some record. My code as
> follow
>
> /env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);/
> /env.addSource(FlinkKafkaConsumer011..)
> /
> /.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(3)) {
>                     @Override
>                     public long extractTimestamp(JSONObject jsonObject) {
>                         long logTime = jsonObject.getLongValue("logTime");
>                         return logTime;
>                     }
>                 })
> /
> /.keyBy(jsonObject -> {
>                     return jsonObject.getString("userId");
>                 })/
> /.timeWindow(Time.seconds(30))
> /
> /.process(new ProcessWindowFunction TimeWindow>() {
> /
> /                 public void process(String key, Context context,
> Iterable iterable, Collector collector) throws
> Exception {/
> /                        SimpleDateFormat sdf = new
> SimpleDateFormat("-MM-dd HH:mm:ss");
>                         String start = sdf.format(new
> Date(context.window().getStart()));
>                         String end = sdf.format(new
> Date(context.window().getEnd()));
>                         System.out.println(start + "" + end);
> /
> /                        for (JSONObject jsonObject : iterable) {/
> /                           collector.collect(jsonObject);/
> /}}}/
> /.print("");/
> /
> /
> From the print result, i found lost some record in the tumbling
> window. I can't figure out, any one can help me ?


signature.asc
Description: OpenPGP digital signature


Re: Flink 1.10 JSON 解析

2020-03-26 Thread Zhenghua Gao
Hi 张宇

看起来是TypeMappingUtils中校验字段物理类型和逻辑类型的bug。
开了一个issue: https://issues.apache.org/jira/browse/FLINK-16800

*Best Regards,*
*Zhenghua Gao*


On Fri, Mar 20, 2020 at 5:28 PM 宇张  wrote:

> hi,
> 了解了,我重新整理一下:
> streamTableEnv
> .connect(
> new Kafka()
> .version("0.11")
> .topic("mysql_binlog_test")
> .startFromEarliest()
> .property("zookeeper.connect",
> "localhost:2181")
> .property("bootstrap.servers",
> "localhost:9092")
> )
> .withFormat(
> new Json()
> )
> .withSchema(
> new Schema()
> .field("business", DataTypes.STRING())
> .field("data", DataTypes.ARRAY(
> DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT()),
> DataTypes.FIELD("vendor_id",
> DataTypes.DOUBLE()),
> DataTypes.FIELD("status",
> DataTypes.BIGINT()),
> DataTypes.FIELD("create_time",
> DataTypes.BIGINT()),
> DataTypes.FIELD("tracking_number",
> DataTypes.STRING()),
> DataTypes.FIELD("invoice_no",
> DataTypes.STRING()),
> DataTypes.FIELD("parent_id",
> DataTypes.BIGINT()
> .field("database", DataTypes.STRING())
> .field("old",
> DataTypes.ARRAY(DataTypes.ROW(DataTypes.FIELD("logistics_status",
> DataTypes.DECIMAL(38,18)
> .field("table", DataTypes.STRING())
> .field("ts", DataTypes.BIGINT())
> .field("type", DataTypes.STRING())
> .field("putRowNum", DataTypes.BIGINT())
> )
> .createTemporaryTable("Test");
> 这里面old复合字段里面子字段的类型使用DECIMAL时抛出异常,采用其他类型是可以的;
> 异常:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ARRAY> of table field 'old'
> does not match with the physical type ARRAY LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return
> type.
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
> at
> org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> at
>
> 

Re: Re: Re: Re: 向您请教pyflink在windows上运行的问题,我第一次接触flink。

2020-03-26 Thread jincheng sun
第一行错误信息是没有安装 bash ?

xu1990xaut  于2020年3月26日周四 下午12:12写道:

> 孙老师,我按照您视频里的方法把flink包安装好了。  但是运行您提供得demo时出现下面这个错误。  我在网上找了好久还是没解决。
> 望老师再指点指点。
>
>
>
>
>
> 在 2020-03-25 15:47:49,"jincheng sun"  写道:
>
> 哦,PyFlink目前不支持windows。
>
> Best,
> Jincheng
> -
> Twitter: https://twitter.com/sunjincheng121
> -
>
>
> xu1990xaut  于2020年3月25日周三 下午2:55写道:
>
>> 谢谢孙老师。   我用的就是这个示例。另外我看到python下又两个flink版本,一个是import flink,一个是import
>> pyflink。 pyflink是不是不能在windows下运行?
>> python下的flink我确定是安装正确的。
>> 运行flink是也启动了start-cluster.bat(start-clust.sh),但是pycharm控制台很久不出结果,cpu的占用率也正常。
>> 我实在不知道是哪里问题。
>>
>>
>>
>>
>>
>> 在 2020-03-25 14:44:25,"jincheng sun"  写道:
>>
>> 上面视频中对应的word_count示例的源码应该是这个:
>> https://github.com/sunjincheng121/enjoyment.code/blob/master/myPyFlink/enjoyment/word_count.py运行完成之后计算结果应该是写到sink_file
>> = 'sink.csv'文件里面去了。你可以将这个文件的路径打印出来,查看这个文件内容。
>>
>> 另外如果您只是为了学习入门的话,建议你查阅[1][2], 我让想整理了解PyFlink最新的状况,可以查看[3]。
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/installation.html
>> [2]
>> https://enjoyment.cool/2020/01/22/Three-Min-Series-How-PyFlink-does-ETL/#more
>> [3]
>> https://www.bilibili.com/video/BV1W7411o7Tj?from=search=14518199503613218690
>>
>> Best,
>> Jincheng
>> -
>> Twitter: https://twitter.com/sunjincheng121
>> -
>>
>>
>> xu1990xaut  于2020年3月25日周三 下午2:23写道:
>>
>>> 孙老师您好,我之前在网上看的是这个视频《【Apache Flink 进阶教程】14课. Apache Flink Python API
>>> 的现状及未来规划》。 今天我也在虚拟机下试了,还是无法运行。
>>> 我用的是flink1.10,python3.6。  麻烦老师指点指点。
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-03-25 11:32:29,"jincheng sun"  写道:
>>>
>>> 很高兴收到您的邮件,我不太确定您具体看的是哪一个视频,所以很难确定您遇到的问题原因。您可以参考官方文档[1],
>>> 同时我个人博客里有一些3分钟的视频和文档入门教程[2],您可以先查阅一下。如又问题,可以保持邮件沟通,可以在我的博客留言!
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/tutorials/python_table_api.html
>>> [2] https://enjoyment.cool/
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>>
>>> xu1990xaut  于2020年3月24日周二 下午11:36写道:
>>>
 您好,之前在哔哩哔哩上看过您讲的视频。  也跟着视频动手做了。
 我用的flink1.10,在pip的时候是直接pip install apache-flink,结果默认就是1.10版本。
 然后我在pycharm中运行word-count这个脚本时,一直不出结果,也不报错。   请问这是什么原因。
 我也装了jdk,另外页面访问flink8081那个端口也可以出来界面。  我是第一次接触flink,在网上也搜过这个问题,
 可是一直没有得到答案。  麻烦您,给小弟指点指点,   谢谢您了。




>>>
>>>
>>>
>>>
>>
>>
>>
>>
>
>
>
>


订阅用户邮件列表

2020-03-26 Thread zhanglianzhg
你好!
我想订阅用户邮件列表,关注及解答用户问题,谢谢!!

When i use the Tumbling Windows, find lost some record

2020-03-26 Thread Jim Chen
Hi, All

  When i use the Tumbling Windows, find lost some record. My code as follow

*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*

*env.addSource(FlinkKafkaConsumer011..)*







*.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor(Time.minutes(3)) {
  @Overridepublic long
extractTimestamp(JSONObject jsonObject) {long
logTime = jsonObject.getLongValue("logTime");return
logTime;}})*


*.keyBy(jsonObject -> {return
jsonObject.getString("userId");})*

*.timeWindow(Time.seconds(30))*

*.process(new ProcessWindowFunction() {*
* public void process(String key, Context context,
Iterable iterable, Collector collector) throws
Exception {*




*SimpleDateFormat sdf = new
SimpleDateFormat("-MM-dd HH:mm:ss");String
start = sdf.format(new Date(context.window().getStart()));
  String end = sdf.format(new Date(context.window().getEnd()));
System.out.println(start + "" + end);*
*for (JSONObject jsonObject : iterable) {*
*   collector.collect(jsonObject);*
*}}}*
*.print("");*

>From the print result, i found lost some record in the tumbling window. I
can't figure out, any one can help me ?


Re: How to consume kafka from the last offset?

2020-03-26 Thread Jim Chen
Thanks!

I made a mistake. I forget to set the auto.offset.reset=false. It's my
fault.

Dominik Wosiński  于2020年3月25日周三 下午6:49写道:

> Hi Jim,
> Well, *auto.offset.reset *is only used when there is no offset saved for
> this *group.id * in Kafka. So, if You want to read the
> data from the latest record (and by latest I mean the newest here) You
> should assign the *group.id * that was not previously
> used and then FlinkKafkaConsumer should automatically fetch the last offset
> and start reading from that place.
>
>
> Best Regards,
> Dom.
>
> śr., 25 mar 2020 o 11:19 Jim Chen 
> napisał(a):
>
>> Hi, All
>>   I use flink-connector-kafka-0.11 consume the Kafka0.11. In
>> KafkaConsumer params, i set the group.id and auto.offset.reset. In the
>> Flink1.10, set the kafkaConsumer.setStartFromGroupOffsets();
>>   Then, i restart the application, found the offset is not from the last
>> position. Any one know where is wrong? HELP!
>>
>


NetworkBufferPool的使用

2020-03-26 Thread yanggang_it_job
Hi:

观察flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments的值发现这个值很大,也就是说NetworkBufferPool还很充裕,可我的任务还是发生了背压告警。
请问各位大佬这是为什么呢?


Re: Emit message at start and end of event time session window

2020-03-26 Thread Manas Kale
Hi Till,
When I run the example code that you posted, the order of the three
messages (window started, contents of window and window ended) is
non-deterministic. This is surprising to me, as setParallelism(1) has been
used in the pipeline - I assumed this should eliminate any form of race
conditions for printing. What's more is that if I *remove*
setParallelism(1) from the code, the output is deterministic and correct
(i.e. windowStarted -> windowContents -> windowEnded).

Clearly, something is wrong with my understanding. What is it?

On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann  wrote:

> Great to hear that you solved the problem. Let us know if you run into any
> other issues.
>
> Cheers,
> Till
>
> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale  wrote:
>
>> Hi,
>> This problem is solved[1]. The issue was that the BroadcastStream did not
>> contain any watermark, which prevented watermarks for any downstream
>> operators from advancing.
>> I appreciate all the help.
>> [1]
>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>
>> Thanks,
>> Manas
>>
>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale  wrote:
>>
>>> Hi Rafi and Till,
>>> Thank you for pointing out that edge case, Rafi.
>>>
>>> Till, I am trying to get this example working with the BroadcastState
>>> pattern upstream to the window operator[1]. The problem is that introducing
>>> the BroadcastState makes the onEventTime() *never* fire. Is the
>>> BroadcastState somehow eating up the watermark? Do I need to generate the
>>> watermark again in the KeyedBroadcastProcessFunction?
>>>
>>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>
>>> Thanks,
>>> Manas
>>>
>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Manas and Rafi,

 you are right that when using merging windows as event time session
 windows are, then Flink requires that any state the Trigger keeps is of
 type MergingState. This constraint allows that the state can be merged
 whenever two windows get merged.

 Rafi, you are right. With the current implementation it might happen
 that you send a wrong started window message. I think it depends on the
 MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
 your watermark. If you want to be on the safe side, then I would recommend
 to use the ProcessFunction to implement the required logic. The
 ProcessFunction [1] is Flink's low level API and gives you access to state
 and timers. In it, you would need to buffer the elements and to sessionize
 them yourself, though. However, it would give you access to the
 watermark which in turn would allow you to properly handle your described
 edge case.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

 Cheers,
 Till

 Cheers,
 Till

 On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch 
 wrote:

> I think one "edge" case which is not handled would be that the first
> event (by event-time) arrives late, then a wrong "started-window" would be
> reported.
>
> Rafi
>
>
> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale 
> wrote:
>
>> Is the reason ValueState cannot be use because session windows are
>> always formed by merging proto-windows of single elements, therefore a
>> state store is needed that can handle merging. ValueState does not 
>> provide
>> this functionality, but a ReducingState does?
>>
>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale 
>> wrote:
>>
>>> Hi Till,
>>> Thanks for your answer! You also answered the next question that I
>>> was about to ask "Can we share state between a Trigger and a Window?"
>>> Currently the only (convoluted) way to share state between two 
>>> operators is
>>> through the broadcast state pattern, right?
>>> Also, in your example, why can't we use a
>>> ValueStateDescriptor in the Trigger? I tried using it in my own
>>> example but it  I am not able to  call the mergePartitionedState() 
>>> method
>>> on a ValueStateDescriptor.
>>>
>>> Regards,
>>> Manas
>>>
>>>
>>>
>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Manas,

 you can implement something like this with a bit of trigger magic.
 What you need to do is to define your own trigger implementation which
 keeps state to remember whether it has triggered the "started window"
 message or not. In the stateful window function you would need to do
 something similar. The first call could trigger the output of "window
 started" and any subsequent call will trigger the evaluation of the 
 window.
 It would have been a bit easier if the trigger and the window process