flink1.12读mysql问题

2021-02-24 Thread 酷酷的浑蛋
'scan.partition.column'='id',

'scan.partition.num'='15',

'scan.partition.lower-bound'='1',

'scan.partition.upper-bound'='680994'


我设置了上面这几个参数给source mysql分区,但是并没有生效,真实情况是只有一个task读的mysql全量数据

Re: java.io.IOException: Could not create storage directory for BLOB store in '/tmp'

2021-02-24 Thread wheatdog liou
Turns out the disk used by docker for mac is full. I followed the operation
on docker site [1] and everything is fine.

[1]
https://docs.docker.com/docker-for-mac/space/#delete-unnecessary-containers-and-images

wheatdog liou  於 2021年2月19日 週五 上午10:47寫道:

> Hi, I am new to Flink and was following Flink with docker-compose
> 
>  and
> encounter this error. I used the session-cluster docker-compose.yml
> template from the document:
>
> version: "2.2"
> services:
>
>   jobmanager:
>
> image: flink:1.12.0-scala_2.11
>
> ports:
>
>   - "8081:8081"
>
> command: jobmanager
>
> environment:
>
>   - |
>
> FLINK_PROPERTIES=
>
> jobmanager.rpc.address: jobmanager
>
>
>
>   taskmanager:
>
> image: flink:1.12.0-scala_2.11
>
> depends_on:
>
>   - jobmanager
>
> command: taskmanager
>
> scale: 1
>
> environment:
>
>   - |
>
> FLINK_PROPERTIES=
>
> jobmanager.rpc.address: jobmanager
>
> taskmanager.numberOfTaskSlots: 2
>
> jobmanager and taskmanager both got the same error at initialization:
>
> taskmanager_1  | 2021-02-19 02:30:36,940 INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
> RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService
> at
> akka://flink-metrics/user/MetricQueryService_35cd708df0ef5a7b90ed8839b105eaa5
> .
> taskmanager_1  | 2021-02-19 02:30:36,978 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - TaskManager
> initialization failed.
> taskmanager_1  | java.io.IOException: Could not create storage directory
> for BLOB store in '/tmp'.
> taskmanager_1  | at
> org.apache.flink.runtime.blob.BlobUtils.initLocalStorageDirectory(BlobUtils.java:154)
> taskmanager_1  | at
> org.apache.flink.runtime.blob.AbstractBlobCache.(AbstractBlobCache.java:106)
> taskmanager_1  | at
> org.apache.flink.runtime.blob.PermanentBlobCache.(PermanentBlobCache.java:104)
> taskmanager_1  | at
> org.apache.flink.runtime.blob.BlobCacheService.(BlobCacheService.java:58)
> taskmanager_1  | at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.(TaskManagerRunner.java:153)
> taskmanager_1  | at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:327)
> taskmanager_1  | at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerSecurely$3(TaskManagerRunner.java:351)
> taskmanager_1  | at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> taskmanager_1  | at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:350)
> taskmanager_1  | at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:335)
> taskmanager_1  | at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:306)
>
> jobmanager_1   | 2021-02-19 02:30:25,739 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not
> start cluster entrypoint StandaloneSessionClusterEntrypoint.
> jobmanager_1   |
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
> jobmanager_1   | at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:189)
> jobmanager_1   | at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
> jobmanager_1   | at
> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64)
> jobmanager_1   | Caused by: java.io.IOException: Could not create storage
> directory for BLOB store in '/tmp'.
> jobmanager_1   | at
> org.apache.flink.runtime.blob.BlobUtils.initLocalStorageDirectory(BlobUtils.java:154)
> jobmanager_1   | at
> org.apache.flink.runtime.blob.BlobServer.(BlobServer.java:140)
> jobmanager_1   | at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:266)
> jobmanager_1   | at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:209)
> jobmanager_1   | at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:171)
> jobmanager_1   | at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> jobmanager_1   | at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:170)
> jobmanager_1   | ... 2 more
>
> Any idea how I can solve it? Thanks!
>


Flink SQL UDF 如何自定义Metrics

2021-02-24 Thread xingoo
HI,
 如题,想要在Flink
SQL中通过自定义UDF增加指标,从而实现自定义告警。那么如何在UDF中获取到RuntimeContext从而修改Metrics呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Get JobId and JobManager RPC Address in RichMapFunction executed in TaskManager

2021-02-24 Thread Sandeep khanzode
Hello,

I am deploying a standalone-job cluster (cluster with a single Job and Task 
Manager instance instantiated with a —job-classname and —job-id).

I have map/flatmap/process functions being executed in the various stream 
functions in the Taskmanager for which I need access to the Job Id and the 
JobManager RPC address. How can I get access to these variables? What in-built 
environment/context/configuration functions exist for this purpose?

I need these two variables for queryable-state.

Thanks

?????? Flink ????????join

2021-02-24 Thread Suhan
benchao??joinrocketmqflink??kafka
 + rocket mq
??flink??





----
??: 
   "user-zh"



Re: Flink custom trigger use case

2021-02-24 Thread Diwakar Jha
Hello,

I tried using *processWindowFunction* since it gives access to
*globalstate* through
*context*. My question is, Is it possible to discard single events inside
*process* function of *processWindowFunction* just like *onElements* of
triggers?
For my use case it seems that trigger is not sufficient but i want to know
how i can do it using processWindowFunction. Appreciate any pointers.

Thanks!

On Wed, Feb 24, 2021 at 10:50 AM Diwakar Jha  wrote:

> Hi Arvid,
>
> Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced
> duplicates though the result is still the same i.e record 1 is fired both
> at the start and the end of the window. so for every window i see the first
> event of the window is coming twice in the output.
>
> I'm trying to explain again the desired behaviour, hopefully it becomes
> clear.
>
> all the records have the same key.
> current output.
>
>> record 1 : first event in the window-1 : fired
>> record 2 : last event in the window-1 : fired
>> record 3 : first event in the window-2 : fired. [this should not have
>> fired since it has the same Key as all other records.]
>> record 4, record 5 : - 2 events in the window-2 : fired.
>>
>
> expected output.
>
>> record 1 : first event in the window-1 : fired
>> record 2 : last event in the window-1 : fired
>> record 3,4,5 : all event in the window-2 : fired
>
>
> I think my problem is to store KeyBy values between windows. For example,
> I want to retain the KeyBy for 1 day. In that case, record 1 is fired
> instantly, all other records (of same key as record1) are always grouped in
> each window (say 1 min) instead of firing instantly.
>
> Thanks!
>
> On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise  wrote:
>
>> Hi Diwakar,
>>
>> the issue is that you fire_and_purge the state, you should just FIRE on
>> the first element (or else you lose the information that you received the
>> element already).
>> You'd use FIRE_AND_PURGE on the last element though.
>>
>> On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Diwakar,
>>>
>>> I'm not sure I fully understand your question.
>>> If event handling in one window depends on some other windows than
>>> TriggerContext.getPartitionedState can not be used. Triggers don't have
>>> access to the global state (only to key-window scoped state).
>>> If that's what you want then please consider ProcessWindowFunction [1]
>>> where you can use context.globalState() in your process function.
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha 
>>> wrote:
>>>

 Hello,

 I'm trying to use a custom trigger for one of my use case. I have a
 basic logic (as shown below) of using keyBy on the input stream and using a
 window of 1 min.

 .keyBy()
 .window(TumblingEventTimeWindows.of(Time.seconds(60)))
 .trigger(new CustomTrigger())
 .aggregate(Input.getAggregationFunction(), new
 AggregationProcessingWindow());


 My custom trigger is expected to fire the first event of the keyBy
 instantly and any subsequent events should be aggregated in the window.

 .trigger(new Trigger() {
> @Override
> public TriggerResult onElement(Record record, long l, TimeWindow
> timeWindow, TriggerContext triggerContext) throws Exception {
> ValueState firstSeen =
> triggerContext.getPartitionedState(firstSceenDescriptor);
> if(firstSeen.value() == null) {
> firstSeen.update(true);
> // fire trigger to early evaluate window and purge that event.
> return TriggerResult.FIRE_AND_PURGE;
> }
> // Continue. Do not evaluate window per element
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> // final evaluation and purge window state
> return TriggerResult.FIRE_AND_PURGE;
> }
> @Override
> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> return TriggerResult.CONTINUE;
> }
> @Override
> public void clear(TimeWindow timeWindow, TriggerContext
> triggerContext) throws Exception {
>
> }
> })




 Currently, I see (for each window and same key) the first event of the
 window is always fired. But I want to see this happening for only the first
 window and for the subsequent window it should aggregate all the events and
 then fire.

 Example : all the records have the same key.
 current output.
 record 1 : first event in the window-1 : fired record 2 : last event in
 the window-1 : fired record 3 : first event in the window-2 : fired record
 4, record 5 : - 2 events in the window-2 : fired.

 

Re: Flink jobs organization and maintainability

2021-02-24 Thread yidan zhao
I used a yarm config file to describe my jobs, and using 'start xxxJobName'
to start the job which is implemented by shell scripts.

Arvid Heise  于2021年2月24日周三 下午10:09写道:

> If you have many similar jobs, they should be in the same repo (especially
> if they have the same development cycle).
>
> First, how different are the jobs?
> A) If they are very similar, go with just one job and configure it
> differently for each application. Then you can use different deployments of
> the same jar with different parameters/config. If you have deployment by
> code, then you will have all deployment files in some special deploy
> directory on root.
> B) If they are somewhat similar, go with one maven/gradle project having
> several modules. Shared code should go into a *common* module. You should
> have a deploy directory per module.
>
> Note that I'd recommend Table API to implement the jobs as you can use the
> simpler Option A much longer. You can easily it configurable to: a) join
> from multiple sources, b) group by a varying number of fields, c) have
> different aggregation functions, d) use different transformation...
>
> On Tue, Feb 23, 2021 at 10:56 PM Sweta Kalakuntla <
> skalakun...@bandwidth.com> wrote:
>
>> Hi,
>>
>> I am going to have to implement many similar jobs. I need guidance and
>> examples that you may have for organizing them in the Git repository
>> without having to have one repo per job.
>>
>> Thanks,
>> SK
>>
>> --
>>
>>
>>


Re: Flink 1.12.1 example applications failing on a single node yarn cluster

2021-02-24 Thread Debraj Manna
The same has been asked in StackOverflow

also. Any suggestions here?

On Wed, Feb 24, 2021 at 10:25 PM Debraj Manna 
wrote:

> I am trying out flink example as explained in flink docs
> 
>  in
> a single node yarn cluster
> 
> .
>
> On executing
>
> ubuntu@vrni-platform:~/build-target/flink$ ./bin/flink run-application -t
> yarn-application ./examples/streaming/TopSpeedWindowing.jar
>
> It is failing with the below errors
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn Application Cluster
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465)
> at 
> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
> at 
> org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
> Caused by: 
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN 
> application unexpectedly switched to state FAILED during deployment.
> Diagnostics from YARN: Application application_1614159836384_0045 failed 1 
> times (global limit =2; local limit is =1) due to AM Container for 
> appattempt_1614159836384_0045_01 exited with  exitCode: -1000
> Failing this attempt.Diagnostics: [2021-02-24 16:19:39.409]File 
> file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
>  does not exist
> java.io.FileNotFoundException: File 
> file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
>  does not exist
> at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
> at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:867)
> at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
> at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442)
> at 
> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:269)
> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:67)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:414)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:411)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:411)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:242)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:235)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:223)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> I have made the log level DEBUG and I do see that flink-dist_2.12-1.12.1.jar 
> is getting copied to /home/ubuntu/.flink/application_1614159836384_0045.
>
> 2021-02-24 16:19:37,768 DEBUG 
> org.apache.flink.yarn.YarnApplicationFileUploader[] - Got 
> modification time 1614183577000 from remote path 
> file:/home/ubuntu/.flink/application_1614159836384_0045/TopSpeedWindowing.jar
> 2021-02-24 16:19:37,769 DEBUG 
> org.apache.flink.yarn.YarnApplicationFileUploader[] - Copying 
> from 

apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-24 Thread Abhishek Shukla
I was getting bean creation logs and spring boot start up logs in Flink 1.9
with flink1.9_log4j-cli.properties (attached)



#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.


log4j.rootLogger=INFO, file


# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=true
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n


# Log output from org.apache.flink.yarn to the console. This is used by the
# CliFrontend class when using a per-job YARN
cluster.log4j.logger.org.apache.flink.yarn=INFO, console
log4j.logger.org.apache.flink.yarn.cli.FlinkYarnSessionCli=INFO, console
log4j.logger.org.apache.hadoop=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# suppress the warning that hadoop native libraries are not loaded
(irrelevant for the client)
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR

but after updating to Flink 1.12.1 those logs are not getting printed in
log file attaching flink1.12_log4j-cli.properties



#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.


rootLogger.level = INFO
rootLogger.appenderRef.file.ref = FileAppender

# Log all infos in the given fileappender.file.name = FileAppender
appender.file.type = FILE
appender.file.append = true
appender.file.fileName = ${sys:log.file}
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
logger.flink.name = org.apache.flink
logger.flink.level = INFO

# Log output from org.apache.flink.yarn to the console. This is used by the
# CliFrontend class when using a per-job YARN cluster.logger.yarn.name
= org.apache.flink.yarn
logger.yarn.level = INFO
logger.yarn.appenderRef.console.ref =
ConsoleAppenderlogger.yarncli.name =
org.apache.flink.yarn.cli.FlinkYarnSessionCli
logger.yarncli.level = INFO
logger.yarncli.appenderRef.console.ref =
ConsoleAppenderlogger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.hadoop.appenderRef.console.ref = ConsoleAppender

# Log output from org.apache.flink.kubernetes to the
console.logger.kubernetes.name = org.apache.flink.kubernetes
logger.kubernetes.level = INFO
logger.kubernetes.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p
%-60c %x - %m%n

# suppress the warning that hadoop native libraries are not loaded
(irrelevant for the client)logger.hadoopnative.name =
org.apache.hadoop.util.NativeCodeLoader
logger.hadoopnative.level = OFF

# Suppress the irrelevant (wrong) warnings from the Netty channel
handlerlogger.netty.name =

Re: Flink Statefun TTL

2021-02-24 Thread Tzu-Li (Gordon) Tai
On Thu, Feb 25, 2021 at 12:06 PM Timothy Bess  wrote:

> Hi Gordon,
>
> Ah so when it said "all registered state" that means all state keys
> defined in the "module.yaml", not all state for all function instances. So
> the expiration has always been _per_ instance then and not across all
> instances of a function.
>

Exactly! Expiration happens individually for each function instance per
declared state.


>
> Thanks for the heads up, that sounds like a good change! I definitely like
> the idea of putting more configuration into the SDK so that there's not two
> sources that have to be kept up to date. Would be neat if eventually the
> SDK just hosts some "/spec" endpoint that serves a list of functions and
> all their configuration options to Statefun on boot.
>

> Btw, I ended up also making a Scala replica of my Haskell library to use
> at work (some of my examples in the microsite are a bit out of date, need
> to revisit that):
> https://github.com/BlueChipFinancial/flink-statefun4s
>
> I know it seems weird to not use an embedded function, but it keeps us
> from having to deal with mismatched Scala versions since Flink is still on
> 2.12, and generally reduces friction using stuff in the Scala Cats
> ecosystem.
>

Really cool to hear about your efforts on a Scala SDK!

I would not say it is weird to implement a Scala SDK for remote functions.
In fact, with the changes upcoming in 3.0, the community is doubling down
on remote as the primary deployment mode for functions, and would like to
have a wider array of supported language SDKs. There's actually a remote
Java SDK that was just merged to master and to be released in 3.0 [1].

Cheers,
Gordon

[1] https://github.com/apache/flink-statefun/tree/master/statefun-sdk-java


> Thanks,
>
> Tim
>
> On Wed, Feb 24, 2021 at 11:49 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Timothy,
>>
>> Starting from StateFun 2.2.x, in the module.yaml file, you can set for
>> each individual state of a function an "expireMode" field, which values can
>> be either "after-invoke" or "after-write". For example:
>>
>> ```
>> - function:
>> meta:
>>   ...
>> spec:
>>   states:
>> - name: state-1
>>   expireMode: after-write
>>   expireAfter: 1min
>> - name: state-2
>>   expireMode: after-invoke
>>   expireAfter: 5sec
>> ```
>>
>> In earlier versions, expireMode can not be individually set for each
>> state. This is more flexible with 2.2.x.
>>
>> As a side note which is somewhat related, all state related
>> configurations will be removed from the module.yaml, instead to be defined
>> by the language SDKs starting from StateFun 3.0.
>> This opens up even more flexibility, such as zero-downtime upgrades of
>> remote functions which allows adding / removing state declarations without
>> restarting the StateFun cluster.
>> We're planning to reach out to the language SDK developers we know of
>> (which includes you for the Haskell SDK ;) ) soon on a briefing of this
>> change, as there is a change in the remote invocation protocol and will
>> require existing SDKs to be updated in order to work with StateFun 3.0.
>>
>> Cheers,
>> Gordon
>>
>> On Wed, Feb 24, 2021 at 11:00 PM Timothy Bess  wrote:
>>
>>> Hey,
>>>
>>> I noticed that the Flink Statefun 2.1.0 release notes had this snippet
>>> with regards to TTL:
>>>
>>> Note: The state expiration mode for remote functions is currently
 restricted to AFTER_READ_AND_WRITE, and the actual TTL being set is the
 longest duration across all registered state, not for each individual state
 entry. This is planned to be improved in upcoming releases (FLINK-17954).

>>>
>>> I noticed that the Ticket and PR for this have been closed with a
>>> reference to commit "289c30e8cdb54d2504ee47a57858a1d179f9a540". Does this
>>> mean that if I upgrade to 2.2.2 and set an expiration in my modules.yaml it
>>> is now "per function id" rather than across instances of said function?
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>


flink sql tumble window 时区问题

2021-02-24 Thread xuhaiLong
hi all:

使用flink sql发现一个时区问题,在flink 1.11.3,flink 1.10 都有发现。


使用eventtime,datestream 转换为table,对times字段使用 rowtime。数据为 161421840,执行完rowtime 
后变成 161418960 直接就少了8小时,导致后续的开窗都有问题。


代码参考:https://paste.ubuntu.com/p/xYpWNrR9MT/





Re: 关于flinksql 与维表mysql的关联问题

2021-02-24 Thread LakeShen
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们考虑借助 Timer 来实现的,社区如果有这个功能的话,我觉得对于 Flink
使用方会有很大帮助的。
我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。

[1] https://issues.apache.org/jira/browse/FLINK-19063

Best,
LakeShen

小屁孩 <932460...@qq.com> 于2020年6月8日周一 上午9:28写道:

> hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题
>
>
>
>
> --原始邮件--
> 发件人:"Px New"<15701181132mr@gmail.com;
> 发送时间:2020年6月7日(星期天) 晚上7:02
> 收件人:"user-zh"
> 主题:Re: 关于flinksql 与维表mysql的关联问题
>
>
>
> 好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?
>
> 1048262223 <1048262...@qq.com于2020年6月7日 周日下午3:57写道:
>
>  Hi
> 
> 
>  可以使用open + broadcast的方式解决~
> 
> 
>  Best,
>  Yichao Yang
> 
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Px New"<15701181132mr@gmail.comgt;;
>  发送时间:nbsp;2020年6月6日(星期六) 上午9:50
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 关于flinksql 与维表mysql的关联问题
> 
> 
> 
>  Hi ,我有一个相关操作的一疑问.
>  疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
> 
>  Michael Ran  
>  gt; 放到open 方法里面可以吗?
>  gt; 在 2020-06-04 14:15:05,"小屁孩" <932460...@qq.comgt; 写道:
>  gt; gt;dear:amp;nbsp; amp;nbsp;
> 我有个问题想请教下,关于flinksql与mysql维表关联
>  关于mysql更新的问题
>  gt;
> 
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
>  gt;


Re: Flink 维表延迟join

2021-02-24 Thread LakeShen
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们这边业务方也有维表延迟关联的述求,比如 HBase 维表的数据关联。
当前的场景是,有一张实时维表,消费 mysql binlog,然后业务方 etl 后,输出到
HBase。然后业务方还有另外一个流,会去关联这张维表,由于存在某些 rowkey
的数据还没有写入到 hbase,而另外一条流就去关联 HBase,却没有数据。所以业务方希望有个延迟维表关联功能,比如 10
分钟后在进行关联,目前我们考虑借助 Timer 来实现的,
社区如果有这个功能的话,我觉得对于 Flink 使用方会有很大帮助的。我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。

[1] https://issues.apache.org/jira/browse/FLINK-19063

Best,
LakeShen


郑斌斌  于2020年8月27日周四 上午9:23写道:

> 小伙伴们:
>
> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
> 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
>
> Thanks


Re: Flink Statefun TTL

2021-02-24 Thread Timothy Bess
Hi Gordon,

Ah so when it said "all registered state" that means all state keys defined
in the "module.yaml", not all state for all function instances. So the
expiration has always been _per_ instance then and not across all instances
of a function.

Thanks for the heads up, that sounds like a good change! I definitely like
the idea of putting more configuration into the SDK so that there's not two
sources that have to be kept up to date. Would be neat if eventually the
SDK just hosts some "/spec" endpoint that serves a list of functions and
all their configuration options to Statefun on boot.

Btw, I ended up also making a Scala replica of my Haskell library to use at
work (some of my examples in the microsite are a bit out of date, need to
revisit that):
https://github.com/BlueChipFinancial/flink-statefun4s

I know it seems weird to not use an embedded function, but it keeps us from
having to deal with mismatched Scala versions since Flink is still on 2.12,
and generally reduces friction using stuff in the Scala Cats ecosystem.

Thanks,

Tim

On Wed, Feb 24, 2021 at 11:49 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Timothy,
>
> Starting from StateFun 2.2.x, in the module.yaml file, you can set for
> each individual state of a function an "expireMode" field, which values can
> be either "after-invoke" or "after-write". For example:
>
> ```
> - function:
> meta:
>   ...
> spec:
>   states:
> - name: state-1
>   expireMode: after-write
>   expireAfter: 1min
> - name: state-2
>   expireMode: after-invoke
>   expireAfter: 5sec
> ```
>
> In earlier versions, expireMode can not be individually set for each
> state. This is more flexible with 2.2.x.
>
> As a side note which is somewhat related, all state related configurations
> will be removed from the module.yaml, instead to be defined by the language
> SDKs starting from StateFun 3.0.
> This opens up even more flexibility, such as zero-downtime upgrades of
> remote functions which allows adding / removing state declarations without
> restarting the StateFun cluster.
> We're planning to reach out to the language SDK developers we know of
> (which includes you for the Haskell SDK ;) ) soon on a briefing of this
> change, as there is a change in the remote invocation protocol and will
> require existing SDKs to be updated in order to work with StateFun 3.0.
>
> Cheers,
> Gordon
>
> On Wed, Feb 24, 2021 at 11:00 PM Timothy Bess  wrote:
>
>> Hey,
>>
>> I noticed that the Flink Statefun 2.1.0 release notes had this snippet
>> with regards to TTL:
>>
>> Note: The state expiration mode for remote functions is currently
>>> restricted to AFTER_READ_AND_WRITE, and the actual TTL being set is the
>>> longest duration across all registered state, not for each individual state
>>> entry. This is planned to be improved in upcoming releases (FLINK-17954).
>>>
>>
>> I noticed that the Ticket and PR for this have been closed with a
>> reference to commit "289c30e8cdb54d2504ee47a57858a1d179f9a540". Does this
>> mean that if I upgrade to 2.2.2 and set an expiration in my modules.yaml it
>> is now "per function id" rather than across instances of said function?
>>
>> Thanks,
>>
>> Tim
>>
>


Re: Flink 维表延迟join

2021-02-24 Thread LakeShen
Hi,

Benchao,这种发送到另外一个 topic
,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。

Benchao Li  于2020年8月27日周四 上午10:08写道:

> Hi,
>
> 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。
>
> 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。
>
> 郑斌斌  于2020年8月27日周四 上午9:23写道:
>
> > 小伙伴们:
> >
> >
> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
> > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
> >
> > Thanks
>
>
>
> --
>
> Best,
> Benchao Li
>


关于Flink五分钟统计导致五分钟的CPU尖刺问题

2021-02-24 Thread yidan zhao
如题,线上集群很多任务依赖五分钟粒度。导致CPU存在五分钟一波的尖刺,正常情况CPU利用为40%,尖刺时候达到90%+。
大家一般类似情况都怎么解决呢?


flink sql 并发数问题

2021-02-24 Thread Jeff
hi all,


用flink sql消费kafka数据,有效并发数是由kafka分区数来决定的,请问有什么方法提高有效并发数吗? 因为有一个UDF是请求python 
http服务,速度不快,有没有方法单独提高这一块的并发数呢?  

关于standalone集群jobmanager在操作时候web-ui卡顿的问题

2021-02-24 Thread yidan zhao
如题,standalone集群,当有集群操作的时候容易卡顿。
集群操作指:提交任务、触发保存点并停止任务、主动触发保存点(不严重)等。
这些操作执行时候web-ui回出现卡顿转圈,大多数情况转圈10-30s内会结束恢复正常,偶尔情况下会出现jobmanager进程失败。

如上,一个是希望大佬们帮忙分析下原因?
目前Jobmanager和Taskmanager是相同机器部署,20台机器,20个Jm和Tm进程。
不清楚卡顿和JM的“内存”是否有关,还是主要CPU? 我JM目前内存10G+,TM内存70G+。
我当前计划想把JM数量降低,不搞20个,本身也用不着。想着单独出来JM部署,这样可以少部署,但提升JM的内存。当然不清楚内存影响大不大。如果是CPU影响大,可能还需要单独部署JM的机器不部署TM这样。


Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 Thread Paul Lam
Hi Even,

我没有实际使用过,不过根据 Debezium 文档 [1] 和我了解到的用户反馈,存量读取和实时增量读取都是支持的。

[1] 
https://debezium.io/documentation/reference/connectors/mongodb.html#mongodb-streaming-changes

Best,
Paul Lam

> 2021年2月24日 17:08,Evan  写道:
> 
> 好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取
> 
> 
> 
> 
> 发件人: Paul Lam
> 发送时间: 2021-02-24 17:03
> 收件人: user-zh
> 主题: Re: 社区有人实现过Flink的MongodbSource吗?
> Hi,
> 
> Debezium 支持 MongoDB CDC[1],可以了解下。
> 
> [1] https://debezium.io/documentation/reference/connectors/mongodb.html
> 
> Best,
> Paul Lam
> 
>> 2021年2月24日 16:23,Evan  写道:
>> 
>> 
>> 有人完整的实现Flink的MongodbSource吗
>> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
>> 
>> 
> 



Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 Thread Paul Lam
据我所知暂时没有。不过我所在公司内部有很多 mongodb 使用,因此我们也有计划开发 mongodb connector(主要是作为 sink)。
之前因为等 FLIP-143 新接口搁置了一下计划,最近可以重启。

如果顺利的话,我们预计放到 bahir 上或贡献给 mongo 社区(考虑到 flink 社区现在对新增 connector 到主 repo 比较谨慎)。

Best,
Paul Lam

> 2021年2月24日 18:16,林影  写道:
> 
> 请问flink的mongodb connector这块后续有计划吗
> 
> Evan  于2021年2月24日周三 下午5:08写道:
> 
>> 好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取
>> 
>> 
>> 
>> 
>> 发件人: Paul Lam
>> 发送时间: 2021-02-24 17:03
>> 收件人: user-zh
>> 主题: Re: 社区有人实现过Flink的MongodbSource吗?
>> Hi,
>> 
>> Debezium 支持 MongoDB CDC[1],可以了解下。
>> 
>> [1] https://debezium.io/documentation/reference/connectors/mongodb.html
>> 
>> Best,
>> Paul Lam
>> 
>>> 2021年2月24日 16:23,Evan  写道:
>>> 
>>> 
>>> 有人完整的实现Flink的MongodbSource吗
>>> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
>>> 
>>> 
>> 
>> 



flink-connector-hbase-2.2 ??????????????HBaseConnectorITCase ????????

2021-02-24 Thread Anlen
hi,all
flinkmaster
HBase?? 
HBaseConnectorITCase 
testHBaseLookupTableSource??
??
Formatting using clusterid: testClusterID
java.io.IOException: All datanodes 
[DatanodeInfoWithStorage[127.0.0.1:50245,DS-96ca101a-88dd-4171-84a7-1807d74885bd,DISK]]
 are bad. Aborting...
at 
org.apache.hadoop.hdfs.DataStreamer.handleBadDatanode(DataStreamer.java:1440)
at 
org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1383)
at 
org.apache.hadoop.hdfs.DataStreamer.processDatanodeError(DataStreamer.java:1184)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:611)

UpsertKafka state持续增加问题

2021-02-24 Thread xiaohui zhang
大家好:
我在flink1.12.1上,通过SQL
API测试upsertKafka,使用hdfs保存checkpoint数据,每30分钟进行一次checkpoint。kafka消息key和value均使用json格式。
持续写入300w不同主键的数据,checkpoint大小持续增加,最终生成save point时,大小接近300M。
请问UpsertKafka模式下,state中是否会一直保存所有的key?未被访问的key是否会被清空呢?


回复: Flink 1.12 On Yarn 作业提交失败问题

2021-02-24 Thread 马阳阳
说明一下,yarn.ship-files这个配置的文件夹下需要包含flink-yarn的jar包,可以配置成flink home下的lib文件夹


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


在2021年02月25日 08:59,马阳阳 写道:
可以试试在flink-conf.yaml里添加如下配置:
yarn.flink-dist-jar: /opt/flink-1.12/lib/flink-dist_2.11-1.12.0.jar
yarn.ship-files: /data/dfl2/lib


这个行为其实很奇怪,在我们的环境里,有的提交任务的机器不需要添加这个配置,有的不加这个配置就会造成那个main class找不到的问题。


Ps: 造成main 
class找不到的原因还可能是程序依赖的版本和部署的flink版本不一致,这种情况可能发生在flink依赖升级之后,部署的flink没有更新或者没有完全更新


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


在2021年02月23日 22:36,m183 写道:
你是指提交时所依赖的flink-dist jar包需要是 1.12 版本吗,现在改成1.12 版本还是不行

2021年2月23日 下午9:27,LakeShen  写道:

这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录



凌战  于2021年2月23日周二 下午7:33写道:

同提交作业到On Yarn集群,客户端的错误也是


org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1610671284452_0243 failed
10 times due to AM Container for appattempt_1610671284452_0243_10
exited with  exitCode: 1
Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from
container-launch.
Container id: container_e48_1610671284452_0243_10_01
Exit code: 1


[2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


[2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


Yarn那边的日志显示:Could not find or load main class
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint


不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 18:46,LakeShen 写道:
Hi 社区,

最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)

at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at

org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at

org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at

org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at

com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)

at

java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)

at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)

... 11 more

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.

at

org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)

at

org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)

at

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)

at

org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at

org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)

... 22 more

Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application 

回复: Flink 1.12 On Yarn 作业提交失败问题

2021-02-24 Thread 马阳阳
可以试试在flink-conf.yaml里添加如下配置:
yarn.flink-dist-jar: /opt/flink-1.12/lib/flink-dist_2.11-1.12.0.jar
yarn.ship-files: /data/dfl2/lib


这个行为其实很奇怪,在我们的环境里,有的提交任务的机器不需要添加这个配置,有的不加这个配置就会造成那个main class找不到的问题。


Ps: 造成main 
class找不到的原因还可能是程序依赖的版本和部署的flink版本不一致,这种情况可能发生在flink依赖升级之后,部署的flink没有更新或者没有完全更新


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


在2021年02月23日 22:36,m183 写道:
你是指提交时所依赖的flink-dist jar包需要是 1.12 版本吗,现在改成1.12 版本还是不行

2021年2月23日 下午9:27,LakeShen  写道:

这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录



凌战  于2021年2月23日周二 下午7:33写道:

同提交作业到On Yarn集群,客户端的错误也是


org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1610671284452_0243 failed
10 times due to AM Container for appattempt_1610671284452_0243_10
exited with  exitCode: 1
Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from
container-launch.
Container id: container_e48_1610671284452_0243_10_01
Exit code: 1


[2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


[2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


Yarn那边的日志显示:Could not find or load main class
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint


不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 18:46,LakeShen 写道:
Hi 社区,

最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)

at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at

org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at

org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at

org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at

com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)

at

java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)

at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)

... 11 more

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.

at

org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)

at

org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)

at

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)

at

org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at

org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)

... 22 more

Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1613992328588_4441 failed 2
times due to AM Container for appattempt_1613992328588_4441_02 exited
with  exitCode: 1
Diagnostics: Exception from 

BackPressure in RowTime Task of FlinkSql Job

2021-02-24 Thread Aeden Jameson
I have a job made up of a few FlinkSQL statements using a
statement set. In my job graph viewed through the Flink UI a few of
the tasks/statements are preceded by this task

rowtime field: (#11: event_time TIME ATTRIBUTE(ROWTIME))

that has an upstream Kafka source/sink task.

Occasionally, some of the rowtime tasks appear back pressured
meaning they have high Outpool buffer usage however all of  downstream
sql tasks have low InPool and OutPool usage.  Also, the CPU and
memory, noo OOM errors, usage is also at acceptable levels as far as I
can tell. Another symptom I notice during these episodes is high
consumer fetch latency with Kafka, but I haven't been able to put my
finger on the direction of the causal arrow. What are some causes of
this behavior and what are the best metrics to look at?

Thank you,
Aeden


Is it possible to specify max process memory in flink 1.8.2, similar to what is possible in flink 1.11

2021-02-24 Thread Bariša
I'm running flink 1.8.2 in a container, and under heavy load, container
gets OOM from the kernel.
I'm guessing that that reason for the kernel OOM is large size of the
off-heap memory. Is there a way I can limit it in flink 1.8.2?

I can see that newer version of flink has a config param, checking here is
it possible to do something similar in flink 1.8.2, without a flink upgrade?

Cheers,
Barisa


Re: Does the Kafka source perform retractions on Key?

2021-02-24 Thread Rex Fenley
All of our Flink jobs are (currently) used for web applications at the end
of the day. We see a lot of latency spikes from record amplification and we
were at first hoping we could pass intermediate results through Kafka and
compact them to lower the record amplification, but then it hit me that
this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to
look for a different solution or only compact on records we know will never
mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise  wrote:

> Jan's response is correct, but I'd like to emphasize the impact on a Flink
> application.
>
> If the compaction happens before the data arrives in Flink, the
> intermediate updates are lost and just the final result appears.
> Also if you restart your Flink application and reprocess older data, it
> will naturally only see the compacted data save for the active segment.
>
> So how to make it deterministic? Simply drop topic compaction. If it's
> coming from CDC and you want to process and produce changelog streams over
> several applications, you probably don't want to use log compactions
> anyways.
>
> Log compaction only makes sense in the snapshot topic that displays the
> current state (KTable), where you don't think in CDC updates anymore but
> just final records, like
> (user_id: 1, state: "california")
> (user_id: 1, state: "ohio")
>
> Usually, if you use CDC in your company, each application is responsible
> for building its own current model by tapping in the relevant changes. Log
> compacted topics would then only appear at the end of processing, when you
> hand it over towards non-analytical applications, such as Web Apps.
>
> On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský  wrote:
>
>> Hi Rex,
>>
>> If I understand correctly, you are concerned about behavior of Kafka
>> source in the case of compacted topic, right? If that is the case, then
>> this is not directly related to Flink, Flink will expose the behavior
>> defined by Kafka. You can read about it for instance here [1]. TL;TD - your
>> pipeline is guaranteed to see every record written to topic (every single
>> update, be it later "overwritten" or not) if it processes the record with
>> latency at most 'delete.retention.ms'. This is configurable per topic -
>> default 24 hours. If you want to reprocess the data later, your consumer
>> might see only resulting compacted ("retracted") stream, and not every
>> record actually written to the topic.
>>
>>  Jan
>>
>> [1]
>> https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262
>> On 2/24/21 3:14 AM, Rex Fenley wrote:
>>
>> Apologies, forgot to finish. If the Kafka source performs its own
>> retractions of old data on key (user_id) for every append it receives, it
>> should resolve this discrepancy I think.
>>
>> Again, is this true? Anything else I'm missing?
>>
>> Thanks!
>>
>>
>> On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley  wrote:
>>
>>> Hi,
>>>
>>> I'm concerned about the impacts of Kafka's compactions when sending data
>>> between running flink jobs.
>>>
>>> For example, one job produces retract stream records in sequence of
>>> (false, (user_id: 1, state: "california") -- retract
>>> (true, (user_id: 1, state: "ohio")) -- append
>>> Which is consumed by Kafka and keyed by user_id, this could end up
>>> compacting to just
>>> (true, (user_id: 1, state: "ohio")) -- append
>>> If some other downstream Flink job has a filter on state == "california"
>>> and reads from the Kafka stream, I assume it will miss the retract message
>>> altogether and produce incorrect results.
>>>
>>> Is this true? How do we prevent this from happening? We need to use
>>> compaction since all our jobs are based on CDC and we can't just drop data
>>> after x number of days.
>>>
>>> Thanks
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Kafka SQL Connector: dropping events if more partitions then source tasks

2021-02-24 Thread Benchao Li
Hi Jan,

What you are observing is correct for the current implementation.

Current watermark generation is based on subtask instead of partition.
Hence if there are
more than on partition in the same subtask, it's very easy to see more data
dropped.

AFAIK, FLIP-27 could solve this problem, however the Kafka Connector has
not been
migrated to FLIP-27 for now.


Jan Oelschlegel  于2021年2月24日周三
下午10:07写道:

> Hi Arvid,
>
>
>
> thanks for bringing back this topic.
>
>
>
> Yes, I’m running on historic data, but as you mentioned that should not be
> the problem, even there is a event-time skew between partitions.
>
>
>
> But maybe this issue with the missing watermark pushdown per partition  is
> the important fact:
>
>
>
> https://issues.apache.org/jira/browse/FLINK-20041
>
>
>
>
>
> Best,
>
> Jan
>
>
>
> *Von:* Arvid Heise 
> *Gesendet:* Mittwoch, 24. Februar 2021 14:10
> *An:* Jan Oelschlegel 
> *Cc:* user ; Timo Walther 
> *Betreff:* Re: Kafka SQL Connector: dropping events if more partitions
> then source tasks
>
>
>
> Hi Jan,
>
>
>
> Are you running on historic data? Then your partitions might drift apart
> quickly.
>
>
>
> However, I still suspect that this is a bug (Watermark should only be from
> the slowest partition). I'm pulling in Timo who should know more.
>
>
>
>
>
>
>
> On Fri, Feb 19, 2021 at 10:50 AM Jan Oelschlegel <
> oelschle...@integration-factory.de> wrote:
>
> If i increase the watermark, the dropped events getting lower. But why is
> the DataStream API Job still running with 12 hours watermark delay?
>
> By the way, I’m using Flink 1.11. It would be nice if someone could give
> me some advice.
>
>
>
> Best,
>
> Jan
>
>
>
> *Von:* Jan Oelschlegel 
> *Gesendet:* Donnerstag, 18. Februar 2021 09:51
> *An:* Jan Oelschlegel ; user <
> user@flink.apache.org>
> *Betreff:* AW: Kafka SQL Connector: dropping events if more partitions
> then source tasks
>
>
>
> By  using the DataStream API with the same business logic I’m getting no
> dropped events.
>
>
>
> *Von:* Jan Oelschlegel 
> *Gesendet:* Mittwoch, 17. Februar 2021 19:18
> *An:* user 
> *Betreff:* Kafka SQL Connector: dropping events if more partitions then
> source tasks
>
>
>
> Hi,
>
>
>
> i have a question regarding FlinkSQL connector for Kafka. I have 3 Kafka
> partitions and 1 Kafka SQL source connector (Parallelism 1). The data
> within the Kafka parttitons are sorted based on a event-time field, which
> is also my event-time in Flink. My Watermark is generated with a delay of
> 12 hours
>
>
>
> WATERMARK FOR eventtime as eventtime - INTERVAL '12' HOUR
>
>
>
>
>
> But the problem is that I see dropping events due arriving late in
> Prometheus.  But with parallelism of 3  there are no drops.
>
>
>
> Do I always have to have as much source-tasks as I have Kafka partitions?
>
>
>
>
>
>
>
> Best,
>
> Jan
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>


-- 

Best,
Benchao Li


Re: Flink 1.11.2 test cases fail with Scala 2.12.12

2021-02-24 Thread soumoks
Thank you! I had scala-library 2.12.8 in my dependency tree (Probably a
remnant from when I was testing with Scala 2.12.8). 

I did the following to fix this issue.

Removed  scala-library 2.12.8 from my dependency tree and added the below
dependency.



org.scala-lang
scala-library
2.12.7







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink custom trigger use case

2021-02-24 Thread Diwakar Jha
Hi Arvid,

Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced duplicates
though the result is still the same i.e record 1 is fired both at the start
and the end of the window. so for every window i see the first event of the
window is coming twice in the output.

I'm trying to explain again the desired behaviour, hopefully it becomes
clear.

all the records have the same key.
current output.

> record 1 : first event in the window-1 : fired
> record 2 : last event in the window-1 : fired
> record 3 : first event in the window-2 : fired. [this should not have
> fired since it has the same Key as all other records.]
> record 4, record 5 : - 2 events in the window-2 : fired.
>

expected output.

> record 1 : first event in the window-1 : fired
> record 2 : last event in the window-1 : fired
> record 3,4,5 : all event in the window-2 : fired


I think my problem is to store KeyBy values between windows. For example, I
want to retain the KeyBy for 1 day. In that case, record 1 is fired
instantly, all other records (of same key as record1) are always grouped in
each window (say 1 min) instead of firing instantly.

Thanks!

On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise  wrote:

> Hi Diwakar,
>
> the issue is that you fire_and_purge the state, you should just FIRE on
> the first element (or else you lose the information that you received the
> element already).
> You'd use FIRE_AND_PURGE on the last element though.
>
> On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Diwakar,
>>
>> I'm not sure I fully understand your question.
>> If event handling in one window depends on some other windows than
>> TriggerContext.getPartitionedState can not be used. Triggers don't have
>> access to the global state (only to key-window scoped state).
>> If that's what you want then please consider ProcessWindowFunction [1]
>> where you can use context.globalState() in your process function.
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha 
>> wrote:
>>
>>>
>>> Hello,
>>>
>>> I'm trying to use a custom trigger for one of my use case. I have a
>>> basic logic (as shown below) of using keyBy on the input stream and using a
>>> window of 1 min.
>>>
>>> .keyBy()
>>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>> .trigger(new CustomTrigger())
>>> .aggregate(Input.getAggregationFunction(), new
>>> AggregationProcessingWindow());
>>>
>>>
>>> My custom trigger is expected to fire the first event of the keyBy
>>> instantly and any subsequent events should be aggregated in the window.
>>>
>>> .trigger(new Trigger() {
 @Override
 public TriggerResult onElement(Record record, long l, TimeWindow
 timeWindow, TriggerContext triggerContext) throws Exception {
 ValueState firstSeen =
 triggerContext.getPartitionedState(firstSceenDescriptor);
 if(firstSeen.value() == null) {
 firstSeen.update(true);
 // fire trigger to early evaluate window and purge that event.
 return TriggerResult.FIRE_AND_PURGE;
 }
 // Continue. Do not evaluate window per element
 return TriggerResult.CONTINUE;
 }
 @Override
 public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
 TriggerContext triggerContext) throws Exception {
 // final evaluation and purge window state
 return TriggerResult.FIRE_AND_PURGE;
 }
 @Override
 public TriggerResult onEventTime(long l, TimeWindow timeWindow,
 TriggerContext triggerContext) throws Exception {
 return TriggerResult.CONTINUE;
 }
 @Override
 public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
 throws Exception {

 }
 })
>>>
>>>
>>>
>>>
>>> Currently, I see (for each window and same key) the first event of the
>>> window is always fired. But I want to see this happening for only the first
>>> window and for the subsequent window it should aggregate all the events and
>>> then fire.
>>>
>>> Example : all the records have the same key.
>>> current output.
>>> record 1 : first event in the window-1 : fired record 2 : last event in
>>> the window-1 : fired record 3 : first event in the window-2 : fired record
>>> 4, record 5 : - 2 events in the window-2 : fired.
>>>
>>> expected output.
>>> record 1 : first event in the window-1 : fired record 2 : last event in
>>> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
>>> window-2 should not fire the first event of the same key.
>>>
>>> I'm reading it here
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
>>> but not able to solve it. Any pointers would be helpful.
>>>
>>> Thanks.
>>>
>>


Re: Flink job finished unexpected

2021-02-24 Thread Rainie Li
I see, I will check tm log.
Thank you Arvid.

Best regards
Rainie

On Wed, Feb 24, 2021 at 5:27 AM Arvid Heise  wrote:

> Hi Rainie,
>
> there are two probably causes:
> * Network instabilities
> * Taskmanager died, then you can further dig in the taskmanager logs for
> errors right before that time.
>
> In both cases, Flink should restart the job with the correct restart
> policies if configured.
>
> On Sat, Feb 20, 2021 at 10:07 PM Rainie Li  wrote:
>
>> Hello,
>>
>> I launched a job with a larger load on hadoop yarn cluster.
>> The Job finished after running 5 hours, I didn't find any error from
>> JobManger log besides this connect exception.
>>
>>
>>
>>
>>
>> *2021-02-20 13:20:14,110 WARN  akka.remote.transport.netty.NettyTransport
>>- Remote connection to [/10.1.57.146:48368
>> ] failed with java.io.IOException: Connection
>> reset by peer2021-02-20 13:20:14,110 WARN
>>  akka.remote.ReliableDeliverySupervisor-
>> Association with remote system [akka.tcp://flink-metrics@host:35241] has
>> failed, address is now gated for [50] ms. Reason: [Disassociated]
>> 2021-02-20 13:20:14,110 WARN  akka.remote.ReliableDeliverySupervisor
>>  - Association with remote system
>> [akka.tcp://flink@host:39493] has failed, address is now gated for [50] ms.
>> Reason: [Disassociated] 2021-02-20 13:20:14,110 WARN
>>  akka.remote.ReliableDeliverySupervisor-
>> Association with remote system [akka.tcp://flink-metrics@host:38481] has
>> failed, address is now gated for [50] ms. Reason: [Disassociated] *
>>
>> Any idea what caused the job to be finished and how to resolve it?
>> Any suggestions are appreciated.
>>
>> Thanks
>> Best regards
>> Rainie
>>
>


Flink 1.12.1 example applications failing on a single node yarn cluster

2021-02-24 Thread Debraj Manna
I am trying out flink example as explained in flink docs

in
a single node yarn cluster

.

On executing

ubuntu@vrni-platform:~/build-target/flink$ ./bin/flink run-application -t
yarn-application ./examples/streaming/TopSpeedWindowing.jar

It is failing with the below errors

org.apache.flink.client.deployment.ClusterDeploymentException:
Couldn't deploy Yarn Application Cluster
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465)
at 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
at 
org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment.
Diagnostics from YARN: Application application_1614159836384_0045
failed 1 times (global limit =2; local limit is =1) due to AM
Container for appattempt_1614159836384_0045_01 exited with
exitCode: -1000
Failing this attempt.Diagnostics: [2021-02-24 16:19:39.409]File
file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
does not exist
java.io.FileNotFoundException: File
file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:867)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442)
at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:269)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:67)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:414)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:411)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:411)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:242)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:235)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:223)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

I have made the log level DEBUG and I do see that
flink-dist_2.12-1.12.1.jar is getting copied to
/home/ubuntu/.flink/application_1614159836384_0045.

2021-02-24 16:19:37,768 DEBUG
org.apache.flink.yarn.YarnApplicationFileUploader[] - Got
modification time 1614183577000 from remote path
file:/home/ubuntu/.flink/application_1614159836384_0045/TopSpeedWindowing.jar
2021-02-24 16:19:37,769 DEBUG
org.apache.flink.yarn.YarnApplicationFileUploader[] -
Copying from file:/home/ubuntu/build-target/flink/lib/flink-dist_2.12-1.12.1.jar
to 
file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
with replication factor 1

The entire DEBUG logs are placed here
.
Nodemanager logs are placed here
.

Can someone let me know what is going wrong? Does flink not support
single node yarn 

Re: Flink Statefun TTL

2021-02-24 Thread Tzu-Li (Gordon) Tai
Hi Timothy,

Starting from StateFun 2.2.x, in the module.yaml file, you can set for each
individual state of a function an "expireMode" field, which values can be
either "after-invoke" or "after-write". For example:

```
- function:
meta:
  ...
spec:
  states:
- name: state-1
  expireMode: after-write
  expireAfter: 1min
- name: state-2
  expireMode: after-invoke
  expireAfter: 5sec
```

In earlier versions, expireMode can not be individually set for each state.
This is more flexible with 2.2.x.

As a side note which is somewhat related, all state related configurations
will be removed from the module.yaml, instead to be defined by the language
SDKs starting from StateFun 3.0.
This opens up even more flexibility, such as zero-downtime upgrades of
remote functions which allows adding / removing state declarations without
restarting the StateFun cluster.
We're planning to reach out to the language SDK developers we know of
(which includes you for the Haskell SDK ;) ) soon on a briefing of this
change, as there is a change in the remote invocation protocol and will
require existing SDKs to be updated in order to work with StateFun 3.0.

Cheers,
Gordon

On Wed, Feb 24, 2021 at 11:00 PM Timothy Bess  wrote:

> Hey,
>
> I noticed that the Flink Statefun 2.1.0 release notes had this snippet
> with regards to TTL:
>
> Note: The state expiration mode for remote functions is currently
>> restricted to AFTER_READ_AND_WRITE, and the actual TTL being set is the
>> longest duration across all registered state, not for each individual state
>> entry. This is planned to be improved in upcoming releases (FLINK-17954).
>>
>
> I noticed that the Ticket and PR for this have been closed with a
> reference to commit "289c30e8cdb54d2504ee47a57858a1d179f9a540". Does this
> mean that if I upgrade to 2.2.2 and set an expiration in my modules.yaml it
> is now "per function id" rather than across instances of said function?
>
> Thanks,
>
> Tim
>


Re: Jackson object serialisations

2021-02-24 Thread Maciej Obuchowski
Hey Lasse,
I've had a similar case, albeit with Avro. I was reading from multiple
Kafka topics, which all had different objects and did some metadata
driven operations on them.
I could not go with any concrete predefined types for them, because
there were hundreds of different object types.

My solution was to serialize the object itself manually as byte[] and
deserialize it manually in operator.
You can do it the same way using something like
objectMapper.writeValueAsBytes and transfer data as Tuple2.

Overall, Flink does not support "dynamic" data types very well.

Regards,
Maciej

śr., 24 lut 2021 o 17:08 Lasse Nedergaard
 napisał(a):
>
> Hi
>
> I’m looking for advice for the best and simplest solution to handle JSON in 
> Flink.
>
> Our system is data driven and based on JSON. As the structure isn’t static 
> mapping it to POJO isn’t an option I therefore transfer ObjectNode and / or 
> ArrayNode between operators either in Tuples
> Tuple2 or as attributes in POJO’s.
>
> Flink doesn’t know about Jackson objects and therefore fail back to Kryo
>
> I see two options.
> 1. Add kryo serialisation objects for all the Jackson types we use and 
> register them.
> 2. Add Jackson objects as Flink types.
>
> I guess option 2 perform best, but it require an annotation for the classes 
> and I can’t do that for 3. Party objects. One workaround could be to create 
> my own objects that extends the Jackson objects and use them between 
> operators.
>
> I can’t be the first to solve this problem so I like to hear what the 
> community suggests.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>


Re: BroadcastState dropped when data deleted in Kafka

2021-02-24 Thread bat man
Hi Arvid,

The Flink application was not re-started. I had checked on that.
By adding rules to the state of process function you mean the state which
is local to the keyedprocess function?
>From [1] what is being done here -

final MapState> state = getRuntimeContext().getMapState(
mapStateDesc);

state.put(ruleName, stored);


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Thanks.


On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise  wrote:

> Could you double-check if your Flink application was restarted between
> Kafka topic was cleared and the time you saw that the rules have been lost?
>
> I suspect that you deleted the Kafka topic and the Flink application then
> failed and restarted. Upon restart it read the empty rule topic.
>
> To solve it, you probably want to add the rules to the state of your
> process function [1]. If you have done that, I'm a bit lost.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>
> On Wed, Feb 24, 2021 at 7:30 AM bat man  wrote:
>
>> Hi,
>>
>> This is my code below -
>> As mentioned earlier the rulesStream us again used in later processing.
>> Below you can see the rulesStream is again connected with the result stream
>> of the first process stream. Do you think this is the reason rules
>> operators state getting overridden when the data in kafka is deleted?
>> My question is if the data is not present in kafka then no data is read
>> in stream how it is updating the existing state data.
>>
>> public static final MapStateDescriptor rulesDescriptor =
>> new MapStateDescriptor<>(
>> "rules", BasicTypeInfo.INT_TYPE_INFO, 
>> TypeInformation.of(Rule.class));
>>
>> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
>> DataStream rawEventStream = 
>> validateData(getRawEventStream(rawEventKafkaSource,env));
>>
>>  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
>>  DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);
>>
>>  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
>>  DataStream deviceDataStream = getDeviceStream(deviceSource,env);
>>
>>  BroadcastStream rulesStream = 
>> rulesDataStream.broadcast(rulesDescriptor);
>>
>>  SingleOutputStreamOperator> 
>> keyedSingleOutputStream =
>>  rawEventStream.
>>  connect(rulesStream).
>>  process(new 
>> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);
>>
>>  SingleOutputStreamOperator rtEventDataStream =
>>  keyedSingleOutputStream.
>>  keyBy((keyed) -> keyed.getKey()).
>>  connect(rulesStream).
>>  process(new 
>> DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);
>>
>>
>> On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Deletion of messages in Kafka shouldn't affect Flink state in general.
>>> Probably, some operator in your pipeline is re-reading the topic
>>> and overwrites the state, dropping what was deleted by Kafka.
>>> Could you share the code?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:
>>>
 Hi,

 I have 2 streams one event data and the other rules. I broadcast the
 rules stream and then key the data stream on event type. The connected
 stream is processed thereafter.
 We faced an issue where the rules data in the topic got deleted because
 of Kafka retention policy.
 Post this the existing rules data also got dropped in the broadcast
 state and the processing stopped.

 As per my understanding the rules which were present in broadcast state
 should still exist even if the data was deleted in Kafka as the rules dats
 was already processed and stored in state map.

 PS: I’m reusing the rules stream as broadcast later in processing as
 well. Could this be an issue?

 Thanks,
 Hemant

>>>


Jackson object serialisations

2021-02-24 Thread Lasse Nedergaard
Hi

I’m looking for advice for the best and simplest solution to handle JSON in 
Flink.

Our system is data driven and based on JSON. As the structure isn’t static 
mapping it to POJO isn’t an option I therefore transfer ObjectNode and / or 
ArrayNode between operators either in Tuples 
Tuple2 or as attributes in POJO’s.

Flink doesn’t know about Jackson objects and therefore fail back to Kryo 

I see two options. 
1. Add kryo serialisation objects for all the Jackson types we use and register 
them. 
2. Add Jackson objects as Flink types. 

I guess option 2 perform best, but it require an annotation for the classes and 
I can’t do that for 3. Party objects. One workaround could be to create my own 
objects that extends the Jackson objects and use them between operators. 

I can’t be the first to solve this problem so I like to hear what the community 
suggests. 

Med venlig hilsen / Best regards
Lasse Nedergaard



Flink Statefun TTL

2021-02-24 Thread Timothy Bess
Hey,

I noticed that the Flink Statefun 2.1.0 release notes had this snippet with
regards to TTL:

Note: The state expiration mode for remote functions is currently
> restricted to AFTER_READ_AND_WRITE, and the actual TTL being set is the
> longest duration across all registered state, not for each individual state
> entry. This is planned to be improved in upcoming releases (FLINK-17954).
>

I noticed that the Ticket and PR for this have been closed with a reference
to commit "289c30e8cdb54d2504ee47a57858a1d179f9a540". Does this mean that
if I upgrade to 2.2.2 and set an expiration in my modules.yaml it is now
"per function id" rather than across instances of said function?

Thanks,

Tim


UpsertKafka状态保存问题

2021-02-24 Thread xiaohui zhang
大家好:
我在flink1.12.1上,通过SQL 
API测试upsertKafka,使用hdfs保存checkpoint数据,每30分钟进行一次checkpoint。kafka消息key和value均使用json格式。
持续写入300w不同主键的数据,checkpoint大小持续增加,最终生成save point时,大小接近300M。
请问UpsertKafka模式下,state中是否会一直保存所有的key?未被访问的key是否会被清空呢?

Re: Co-relate two streams

2021-02-24 Thread Arvid Heise
Hi Abhinav,

sounds like you want to implement a join [1]. You usually want to use a
window and then correlate the data between them only within the timeframe.
You can use global windows if you cannot add a time window, but note that
the state will grow indefinitely.

If one of the sources is small, also consider the broadcast state pattern.
[2]

Note that if you are application is only doing standard relational algebra,
I'd recommend Table API/SQL which will produce faster applications [3].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html#the-broadcast-state-pattern
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

On Wed, Feb 24, 2021 at 11:14 AM Abhinav Sharma 
wrote:

> Hi,
>
> How can I co-relate two streams of different types in Flink?
> Scenario: In stream1, I have data in pojo with a field user. In stream2, I
> have data in a different pojo which also contains the field user. (However,
> other than the user field, they have no common field).
>
> Now what I want to do is relate the two streams such that for every event
> in stream1, I want to collect events from stream2 where the user is the
> same. Both stream1 and stream2 are unbounded.
>
> I tried using
> stream1.connect(stream2).process(new CoProcessFunction Type2>) {
> private String user;
>
> public void processElement1(Type1 inp, CoProcessFunction Type2>.Context ctx, Collector out)  {
> user = inp.getUser();
> }
>
> public void processElement2(Type2 inp, CoProcessFunction Type2>.Context ctx, Collector out)  {
> if (user.equals(inp.getUser())) {
> out.collect(inp);
> }
> }
> });
>
> But this works only and only if both elements occur simultaneously.
>
> How can I collect the cases with history? Is using ListState required?
> Is there some better way to this in Flink?
>
>
> Requesting help,
> Abhinav
>


Re: Does the Kafka source perform retractions on Key?

2021-02-24 Thread Arvid Heise
Jan's response is correct, but I'd like to emphasize the impact on a Flink
application.

If the compaction happens before the data arrives in Flink, the
intermediate updates are lost and just the final result appears.
Also if you restart your Flink application and reprocess older data, it
will naturally only see the compacted data save for the active segment.

So how to make it deterministic? Simply drop topic compaction. If it's
coming from CDC and you want to process and produce changelog streams over
several applications, you probably don't want to use log compactions
anyways.

Log compaction only makes sense in the snapshot topic that displays the
current state (KTable), where you don't think in CDC updates anymore but
just final records, like
(user_id: 1, state: "california")
(user_id: 1, state: "ohio")

Usually, if you use CDC in your company, each application is responsible
for building its own current model by tapping in the relevant changes. Log
compacted topics would then only appear at the end of processing, when you
hand it over towards non-analytical applications, such as Web Apps.

On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský  wrote:

> Hi Rex,
>
> If I understand correctly, you are concerned about behavior of Kafka
> source in the case of compacted topic, right? If that is the case, then
> this is not directly related to Flink, Flink will expose the behavior
> defined by Kafka. You can read about it for instance here [1]. TL;TD - your
> pipeline is guaranteed to see every record written to topic (every single
> update, be it later "overwritten" or not) if it processes the record with
> latency at most 'delete.retention.ms'. This is configurable per topic -
> default 24 hours. If you want to reprocess the data later, your consumer
> might see only resulting compacted ("retracted") stream, and not every
> record actually written to the topic.
>
>  Jan
>
> [1]
> https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262
> On 2/24/21 3:14 AM, Rex Fenley wrote:
>
> Apologies, forgot to finish. If the Kafka source performs its own
> retractions of old data on key (user_id) for every append it receives, it
> should resolve this discrepancy I think.
>
> Again, is this true? Anything else I'm missing?
>
> Thanks!
>
>
> On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley  wrote:
>
>> Hi,
>>
>> I'm concerned about the impacts of Kafka's compactions when sending data
>> between running flink jobs.
>>
>> For example, one job produces retract stream records in sequence of
>> (false, (user_id: 1, state: "california") -- retract
>> (true, (user_id: 1, state: "ohio")) -- append
>> Which is consumed by Kafka and keyed by user_id, this could end up
>> compacting to just
>> (true, (user_id: 1, state: "ohio")) -- append
>> If some other downstream Flink job has a filter on state == "california"
>> and reads from the Kafka stream, I assume it will miss the retract message
>> altogether and produce incorrect results.
>>
>> Is this true? How do we prevent this from happening? We need to use
>> compaction since all our jobs are based on CDC and we can't just drop data
>> after x number of days.
>>
>> Thanks
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>


Re: Window Process function is not getting trigger

2021-02-24 Thread sagar
Thanks Kezhu, It worked!!!

On Wed, Feb 24, 2021 at 2:47 PM Kezhu Wang  wrote:

> Try `env.setParallelism(1)`. Default parallelism for local environment is
> `Runtime.getRuntime.availableProcessors`.
>
> You test data set are so small that when they are scatter cross multiple
> parallel instances, there will be no data with event time assigned to
> trigger downstream computation.
>
> Or you could try `WatermarkStrategy.withIdleness`.
>
>
> Best,
> Kezhu Wang
>
> On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote:
>
> It is fairly simple requirement, if I changed it to PRocessing time it
> works fine , but not working with event time..help appreciated!
>
> On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:
>
>> HI
>>
>> Corrected with below code, but still getting same issue
>>
>> Instant instant = 
>> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
>> long timeInMillis = instant.toEpochMilli();
>> System.out.println(timeInMillis);
>> return timeInMillis;
>>
>>
>> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>>
>>> I saw one potential issue. Your timestamp assigner returns timestamp in
>>> second resolution while Flink requires millisecond resolution.
>>>
>>>
>>> Best,
>>> Kezhu Wang
>>>
>>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>>
>>> I have simple flink stream program, where I am using socket as my
>>> continuous source
>>> I have window size of 2 seconds.
>>>
>>> Somehow my window process function is not triggering and even if I pass
>>> events in any order, flink is not ignoring
>>>
>>> I can see the output only when I kill my socket , please find the code
>>> snippet below
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>>
>>> DataStream price = env.socketTextStream("localhost",
>>> 9998).uid("price source").map(new MapFunction() {
>>> @Override
>>> public Price map(String s) throws Exception {
>>> return new Price(s.split(",")[0],
>>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>>> }
>>> }
>>> );
>>>
>>> DataStream priceStream = price
>>>
>>>  
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>>> .withTimestampAssigner((p,timestamp) ->
>>> {
>>> ZoneId zoneId = ZoneId.systemDefault();
>>> long epoch =
>>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>>> System.out.println(epoch);
>>>  return epoch;
>>> }))
>>> .keyBy(new KeySelector() {
>>> @Override
>>> public String getKey(Price price) throws Exception {
>>> return price.getPerformanceId();
>>> }
>>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>>> .process(new ProcessWindowFunction>> TimeWindow>() {
>>>
>>> @Override
>>> public void process(String s, Context context,
>>> Iterable iterable, Collector collector) throws Exception {
>>> System.out.println(context.window().getStart()+
>>> "Current watermark: "+context.window().getEnd());
>>> Price p1 = null ;
>>> for(Price p : iterable)
>>> {
>>> System.out.println(p.toString());
>>> p1= p;
>>> }
>>> collector.collect(p1);
>>> }
>>> });
>>>
>>>
>>> priceStream.writeAsText("c:\\ab.txt");
>>>
>>> also data I am inputting are
>>>
>>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>>
>>> --
>>> ---Regards---
>>>
>>>   Sagar Bandal
>>>
>>> This is confidential mail ,All Rights are Reserved.If you are not
>>> intended receipiant please ignore this email.
>>>
>>>
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>
>

-- 
---Regards---

  Sagar Bandal

This is confidential 

Re: Object and Integer size in RocksDB ValueState

2021-02-24 Thread Maciej Obuchowski
Thanks Roman, that's exactly what I needed.

śr., 24 lut 2021 o 14:37 Roman Khachatryan  napisał(a):
>
> Thanks for the clarification.
>
> RocksDB stores whatever value Flink passes to it after serialization.
> The value is passed as an array of bytes so the minimum is single byte.
> Integer would require 4 bytes, Object - 1 or 2 depending on the serializer 
> (Pojo or Kryo), and boolean just 1 byte.
> Besides that, boolean serialization is apparently faster.
>
> Sizes in memory, on disk and of snapshot are all affected proportionally.
>
> You are right regarding Flink compression settings will not have any impact 
> with incremental checkpoints.
>
> Regards,
> Roman
>
>
> On Wed, Feb 24, 2021 at 11:01 AM Maciej Obuchowski 
>  wrote:
>>
>> Hey. Let me send simplified example, because I don't think this
>> "(given that the actual stored objects (integers) are the same)" is
>> true - I'm just storing object as a placeholder:
>>
>> public class DeduplicationProcessFunction extends
>> KeyedProcessFunction implements CheckpointedFunction {
>>
>> private transient ValueState processedState;
>>
>> public DeduplicationProcessFunction() { }
>>
>> @Override
>> public void snapshotState(FunctionSnapshotContext context) throws
>> Exception { }
>>
>> @Override
>> public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>> val descriptor = new ValueStateDescriptor<>("processed",
>> TypeInformation.of(Object.class));
>> processedState = context.getKeyedStateStore().getState(descriptor);
>> }
>>
>> @Override
>> public void processElement(IN value, Context ctx, Collector
>> out) throws Exception {
>> val processed = processedState.value();
>> if (processed == null) {
>> processedState.update(new Object());
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>>
>> Basically, I'm not sure what rocksdb stores in this case. I'm sure
>> that it needs to store key, which is 32byte sha key in this case.
>> What's the value? Is it the 16 bytes that Java requires in-memory? If
>> I'll change my ValueState to integer, and provide additional value
>> there, will it require more storage space? Also, to respond to your
>> point about compression, we're using incremental checkpoints, so I
>> don't think anything will change as per docs. I'm not only interested
>> in snapshot size, but also size of current, in memory and local disk
>> state.
>>
>> Thanks,
>> Maciej
>>
>>
>>
>> wt., 23 lut 2021 o 17:53 Roman Khachatryan  napisał(a):
>> >
>> > Hi Maciej,
>> >
>> > If I understand correctly, you're asking whether ValueState parameterized 
>> > with Object has the same size as the one with Integer (given that the 
>> > actual stored objects (integers) are the same).
>> > With RocksDB, any state object is serialized first and only then it is 
>> > stored in MemTable or in an SST file. So it doesn't matter as long as the 
>> > same serializer is used.
>> >
>> > You probably should try enabling compression if you didn't already: 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski 
>> >  wrote:
>> >>
>> >> Hey.
>> >>
>> >> We have deduplication job that has a large amount of keyed ValueState. We 
>> >> want to decrease state size as much as possible, so we're using 
>> >> ValueState as it's smallest possible Java non-primitive. However, 
>> >> as per https://www.baeldung.com/java-size-of-object (and my measurements) 
>> >> Java Integer has the same memory size as Object due to padding.
>> >> Will this still be true with RocksDB state? Can we put Integer in state 
>> >> without increasing state size?
>> >>
>> >> Thanks, Maciej


Re: BroadcastState dropped when data deleted in Kafka

2021-02-24 Thread Arvid Heise
Could you double-check if your Flink application was restarted between
Kafka topic was cleared and the time you saw that the rules have been lost?

I suspect that you deleted the Kafka topic and the Flink application then
failed and restarted. Upon restart it read the empty rule topic.

To solve it, you probably want to add the rules to the state of your
process function [1]. If you have done that, I'm a bit lost.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

On Wed, Feb 24, 2021 at 7:30 AM bat man  wrote:

> Hi,
>
> This is my code below -
> As mentioned earlier the rulesStream us again used in later processing.
> Below you can see the rulesStream is again connected with the result stream
> of the first process stream. Do you think this is the reason rules
> operators state getting overridden when the data in kafka is deleted?
> My question is if the data is not present in kafka then no data is read in
> stream how it is updating the existing state data.
>
> public static final MapStateDescriptor rulesDescriptor =
> new MapStateDescriptor<>(
> "rules", BasicTypeInfo.INT_TYPE_INFO, 
> TypeInformation.of(Rule.class));
>
> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
> DataStream rawEventStream = 
> validateData(getRawEventStream(rawEventKafkaSource,env));
>
>  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
>  DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);
>
>  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
>  DataStream deviceDataStream = getDeviceStream(deviceSource,env);
>
>  BroadcastStream rulesStream = 
> rulesDataStream.broadcast(rulesDescriptor);
>
>  SingleOutputStreamOperator> 
> keyedSingleOutputStream =
>  rawEventStream.
>  connect(rulesStream).
>  process(new 
> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);
>
>  SingleOutputStreamOperator rtEventDataStream =
>  keyedSingleOutputStream.
>  keyBy((keyed) -> keyed.getKey()).
>  connect(rulesStream).
>  process(new 
> DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);
>
>
> On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi,
>>
>> Deletion of messages in Kafka shouldn't affect Flink state in general.
>> Probably, some operator in your pipeline is re-reading the topic
>> and overwrites the state, dropping what was deleted by Kafka.
>> Could you share the code?
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:
>>
>>> Hi,
>>>
>>> I have 2 streams one event data and the other rules. I broadcast the
>>> rules stream and then key the data stream on event type. The connected
>>> stream is processed thereafter.
>>> We faced an issue where the rules data in the topic got deleted because
>>> of Kafka retention policy.
>>> Post this the existing rules data also got dropped in the broadcast
>>> state and the processing stopped.
>>>
>>> As per my understanding the rules which were present in broadcast state
>>> should still exist even if the data was deleted in Kafka as the rules dats
>>> was already processed and stored in state map.
>>>
>>> PS: I’m reusing the rules stream as broadcast later in processing as
>>> well. Could this be an issue?
>>>
>>> Thanks,
>>> Hemant
>>>
>>


Re: Flink custom trigger use case

2021-02-24 Thread Arvid Heise
Hi Diwakar,

the issue is that you fire_and_purge the state, you should just FIRE on the
first element (or else you lose the information that you received the
element already).
You'd use FIRE_AND_PURGE on the last element though.

On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Diwakar,
>
> I'm not sure I fully understand your question.
> If event handling in one window depends on some other windows than
> TriggerContext.getPartitionedState can not be used. Triggers don't have
> access to the global state (only to key-window scoped state).
> If that's what you want then please consider ProcessWindowFunction [1]
> where you can use context.globalState() in your process function.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha 
> wrote:
>
>>
>> Hello,
>>
>> I'm trying to use a custom trigger for one of my use case. I have a basic
>> logic (as shown below) of using keyBy on the input stream and using a
>> window of 1 min.
>>
>> .keyBy()
>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>> .trigger(new CustomTrigger())
>> .aggregate(Input.getAggregationFunction(), new
>> AggregationProcessingWindow());
>>
>>
>> My custom trigger is expected to fire the first event of the keyBy
>> instantly and any subsequent events should be aggregated in the window.
>>
>> .trigger(new Trigger() {
>>> @Override
>>> public TriggerResult onElement(Record record, long l, TimeWindow
>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>> ValueState firstSeen =
>>> triggerContext.getPartitionedState(firstSceenDescriptor);
>>> if(firstSeen.value() == null) {
>>> firstSeen.update(true);
>>> // fire trigger to early evaluate window and purge that event.
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> // Continue. Do not evaluate window per element
>>> return TriggerResult.CONTINUE;
>>> }
>>> @Override
>>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>>> TriggerContext triggerContext) throws Exception {
>>> // final evaluation and purge window state
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> @Override
>>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>>> TriggerContext triggerContext) throws Exception {
>>> return TriggerResult.CONTINUE;
>>> }
>>> @Override
>>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>>> throws Exception {
>>>
>>> }
>>> })
>>
>>
>>
>>
>> Currently, I see (for each window and same key) the first event of the
>> window is always fired. But I want to see this happening for only the first
>> window and for the subsequent window it should aggregate all the events and
>> then fire.
>>
>> Example : all the records have the same key.
>> current output.
>> record 1 : first event in the window-1 : fired record 2 : last event in
>> the window-1 : fired record 3 : first event in the window-2 : fired record
>> 4, record 5 : - 2 events in the window-2 : fired.
>>
>> expected output.
>> record 1 : first event in the window-1 : fired record 2 : last event in
>> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
>> window-2 should not fire the first event of the same key.
>>
>> I'm reading it here
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
>> but not able to solve it. Any pointers would be helpful.
>>
>> Thanks.
>>
>


Re: Flink jobs organization and maintainability

2021-02-24 Thread Arvid Heise
If you have many similar jobs, they should be in the same repo (especially
if they have the same development cycle).

First, how different are the jobs?
A) If they are very similar, go with just one job and configure it
differently for each application. Then you can use different deployments of
the same jar with different parameters/config. If you have deployment by
code, then you will have all deployment files in some special deploy
directory on root.
B) If they are somewhat similar, go with one maven/gradle project having
several modules. Shared code should go into a *common* module. You should
have a deploy directory per module.

Note that I'd recommend Table API to implement the jobs as you can use the
simpler Option A much longer. You can easily it configurable to: a) join
from multiple sources, b) group by a varying number of fields, c) have
different aggregation functions, d) use different transformation...

On Tue, Feb 23, 2021 at 10:56 PM Sweta Kalakuntla 
wrote:

> Hi,
>
> I am going to have to implement many similar jobs. I need guidance and
> examples that you may have for organizing them in the Git repository
> without having to have one repo per job.
>
> Thanks,
> SK
>
> --
>
>
>


AW: Kafka SQL Connector: dropping events if more partitions then source tasks

2021-02-24 Thread Jan Oelschlegel
Hi Arvid,

thanks for bringing back this topic.

Yes, I’m running on historic data, but as you mentioned that should not be the 
problem, even there is a event-time skew between partitions.

But maybe this issue with the missing watermark pushdown per partition  is the 
important fact:

https://issues.apache.org/jira/browse/FLINK-20041


Best,
Jan

Von: Arvid Heise 
Gesendet: Mittwoch, 24. Februar 2021 14:10
An: Jan Oelschlegel 
Cc: user ; Timo Walther 
Betreff: Re: Kafka SQL Connector: dropping events if more partitions then 
source tasks

Hi Jan,

Are you running on historic data? Then your partitions might drift apart 
quickly.

However, I still suspect that this is a bug (Watermark should only be from the 
slowest partition). I'm pulling in Timo who should know more.



On Fri, Feb 19, 2021 at 10:50 AM Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>> 
wrote:
If i increase the watermark, the dropped events getting lower. But why is the 
DataStream API Job still running with 12 hours watermark delay?

By the way, I’m using Flink 1.11. It would be nice if someone could give me 
some advice.

Best,
Jan

Von: Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>>
Gesendet: Donnerstag, 18. Februar 2021 09:51
An: Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>>;
 user mailto:user@flink.apache.org>>
Betreff: AW: Kafka SQL Connector: dropping events if more partitions then 
source tasks

By  using the DataStream API with the same business logic I’m getting no 
dropped events.

Von: Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>>
Gesendet: Mittwoch, 17. Februar 2021 19:18
An: user mailto:user@flink.apache.org>>
Betreff: Kafka SQL Connector: dropping events if more partitions then source 
tasks

Hi,

i have a question regarding FlinkSQL connector for Kafka. I have 3 Kafka 
partitions and 1 Kafka SQL source connector (Parallelism 1). The data within 
the Kafka parttitons are sorted based on a event-time field, which is also my 
event-time in Flink. My Watermark is generated with a delay of 12 hours

WATERMARK FOR eventtime as eventtime - INTERVAL '12' HOUR


But the problem is that I see dropping events due arriving late in Prometheus.  
But with parallelism of 3  there are no drops.

Do I always have to have as much source-tasks as I have Kafka partitions?



Best,
Jan
HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.
HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.
HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.


Re: Configure operator based on key

2021-02-24 Thread Arvid Heise
Hi Abhinav,

I think there is no way and I don't easily see how that could be added. You
can however apply the same logic in the trigger also in your process
function to detect which case caused the trigger.
If that is expensive to calculate, you might want to do it in a map before
entering the window and have a trivial trigger.

input -> map(case detection) -> window (trigger = check if any case has
been detected, process function = switch over case)

On Tue, Feb 23, 2021 at 4:05 AM Abhinav Sharma 
wrote:

> Hi Yidan,
>
> Thank you for your reply. I was wondering if there is some way that the
> process function can kiw which condition fired the trigger.
>
> Eg: If I set trigger to fire when he object associated with key have value
> 2, 8, 10 (3 conditions for the trigger to fire), then if he process
> function, I want to operate differently on them.
>
> On Mon, Feb 22, 2021, 11:23 AM yidan zhao  wrote:
>
>> You can self-define it using keyedStream.window(GlobalWindows.create()
>> ).trigger(self-defined-trigger).
>>
>> Abhinav Sharma  于2021年2月21日周日 下午3:57写道:
>>
>>> Hi,
>>>
>>> Is there some way that I can configure an operator based on the key in a
>>> stream?
>>> Eg: If the key is 'abcd', then create a window of size X counts, if the
>>> key is 'bfgh', then create a window of size Y counts.
>>>
>>> Is this scenario possible in flink
>>>
>>>


Re: Object and Integer size in RocksDB ValueState

2021-02-24 Thread Roman Khachatryan
Thanks for the clarification.

RocksDB stores whatever value Flink passes to it after serialization.
The value is passed as an array of bytes so the minimum is single byte.
Integer would require 4 bytes, Object - 1 or 2 depending on the serializer
(Pojo or Kryo), and boolean just 1 byte.
Besides that, boolean serialization is apparently faster.

Sizes in memory, on disk and of snapshot are all affected proportionally.

You are right regarding Flink compression settings will not have any impact
with incremental checkpoints.

Regards,
Roman


On Wed, Feb 24, 2021 at 11:01 AM Maciej Obuchowski <
obuchowski.mac...@gmail.com> wrote:

> Hey. Let me send simplified example, because I don't think this
> "(given that the actual stored objects (integers) are the same)" is
> true - I'm just storing object as a placeholder:
>
> public class DeduplicationProcessFunction extends
> KeyedProcessFunction implements CheckpointedFunction {
>
> private transient ValueState processedState;
>
> public DeduplicationProcessFunction() { }
>
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception { }
>
> @Override
> public void initializeState(FunctionInitializationContext context)
> throws Exception {
> val descriptor = new ValueStateDescriptor<>("processed",
> TypeInformation.of(Object.class));
> processedState = context.getKeyedStateStore().getState(descriptor);
> }
>
> @Override
> public void processElement(IN value, Context ctx, Collector
> out) throws Exception {
> val processed = processedState.value();
> if (processed == null) {
> processedState.update(new Object());
> out.collect(value);
> }
> }
> }
>
>
>
> Basically, I'm not sure what rocksdb stores in this case. I'm sure
> that it needs to store key, which is 32byte sha key in this case.
> What's the value? Is it the 16 bytes that Java requires in-memory? If
> I'll change my ValueState to integer, and provide additional value
> there, will it require more storage space? Also, to respond to your
> point about compression, we're using incremental checkpoints, so I
> don't think anything will change as per docs. I'm not only interested
> in snapshot size, but also size of current, in memory and local disk
> state.
>
> Thanks,
> Maciej
>
>
>
> wt., 23 lut 2021 o 17:53 Roman Khachatryan  napisał(a):
> >
> > Hi Maciej,
> >
> > If I understand correctly, you're asking whether ValueState
> parameterized with Object has the same size as the one with Integer (given
> that the actual stored objects (integers) are the same).
> > With RocksDB, any state object is serialized first and only then it is
> stored in MemTable or in an SST file. So it doesn't matter as long as the
> same serializer is used.
> >
> > You probably should try enabling compression if you didn't already:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski <
> obuchowski.mac...@gmail.com> wrote:
> >>
> >> Hey.
> >>
> >> We have deduplication job that has a large amount of keyed ValueState.
> We want to decrease state size as much as possible, so we're using
> ValueState as it's smallest possible Java non-primitive. However,
> as per https://www.baeldung.com/java-size-of-object (and my measurements)
> Java Integer has the same memory size as Object due to padding.
> >> Will this still be true with RocksDB state? Can we put Integer in state
> without increasing state size?
> >>
> >> Thanks, Maciej
>


Re: stop job with Savepoint

2021-02-24 Thread Arvid Heise
Hi Alexey,

The list looks complete to me. Please report back if this is not correct.

On Sat, Feb 20, 2021 at 11:30 PM Alexey Trenikhun  wrote:

> Adding "list" to verbs helps, do I need to add anything else ?
>
> --
> *From:* Alexey Trenikhun 
> *Sent:* Saturday, February 20, 2021 2:10 PM
> *To:* Flink User Mail List 
> *Subject:* stop job with Savepoint
>
> Hello,
> I'm running per job Flink cluster, JM is deployed as Kubernetes Job
> with restartPolicy: Never, highavailability is KubernetesHaServicesFactory.
> Job runs fine for some time, configmaps are created etc.  Now in order to
> upgrade Flink job, I'm trying to stop job with savepoint (flink
> stop $JOB_ID), JM exits with code 2, from log:
>
> *{"ts":"2021-02-20T21:34:18.195Z","message":"Terminating cluster
> entrypoint process StandaloneApplicationClusterEntryPoint with exit code
> 2.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2,"stack_trace":"java.util.concurrent.ExecutionException:
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing:
> GET at:
> https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes
> .
> Message: Forbidden!Configured service account doesn't have access. Service
> account may have been revoked. configmaps is forbidden: User
> \"system:serviceaccount:n:fsp\" cannot list resource \"configmaps\" in API
> group \"\" in the namespace \"n\".\n\tat
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)\n\tat
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices.internalCleanup(KubernetesHaServices.java:142)\n\tat
> org.apache.flink.runtime.highavailability.AbstractHaServices.closeAndCleanupAllData(AbstractHaServices.java:180)\n\tat
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.stopClusterServices(ClusterEntrypoint.java:378)\n\tat
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$shutDownAsync$3(ClusterEntrypoint.java:467)\n\tat
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$composeAfterwards$19(FutureUtils.java:704)\n\tat
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$null$18(FutureUtils.java:715)\n\tat
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent.lambda$closeAsyncInternal$3(DispatcherResourceManagerComponent.java:182)\n\tat
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
> org.apache.flink.runtime.concurrent.FutureUtils$CompletionConjunctFuture.completeFuture(FutureUtils.java:956)\n\tat
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$22(FutureUtils.java:1323)\n\tat
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
> java.lang.Thread.run(Thread.java:748)\nCaused by:
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing:
> GET at:
> 

Re: Flink job finished unexpected

2021-02-24 Thread Arvid Heise
Hi Rainie,

there are two probably causes:
* Network instabilities
* Taskmanager died, then you can further dig in the taskmanager logs for
errors right before that time.

In both cases, Flink should restart the job with the correct restart
policies if configured.

On Sat, Feb 20, 2021 at 10:07 PM Rainie Li  wrote:

> Hello,
>
> I launched a job with a larger load on hadoop yarn cluster.
> The Job finished after running 5 hours, I didn't find any error from
> JobManger log besides this connect exception.
>
>
>
>
>
> *2021-02-20 13:20:14,110 WARN  akka.remote.transport.netty.NettyTransport
>- Remote connection to [/10.1.57.146:48368
> ] failed with java.io.IOException: Connection
> reset by peer2021-02-20 13:20:14,110 WARN
>  akka.remote.ReliableDeliverySupervisor-
> Association with remote system [akka.tcp://flink-metrics@host:35241] has
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> 2021-02-20 13:20:14,110 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system
> [akka.tcp://flink@host:39493] has failed, address is now gated for [50] ms.
> Reason: [Disassociated] 2021-02-20 13:20:14,110 WARN
>  akka.remote.ReliableDeliverySupervisor-
> Association with remote system [akka.tcp://flink-metrics@host:38481] has
> failed, address is now gated for [50] ms. Reason: [Disassociated] *
>
> Any idea what caused the job to be finished and how to resolve it?
> Any suggestions are appreciated.
>
> Thanks
> Best regards
> Rainie
>


Re: How to pass PROCTIME through an aggregate

2021-02-24 Thread Arvid Heise
Hi Rex,

just an idea, wouldn't it be possible to just add

UNIX_TIMESTAMP()

right before your window operation?



On Sat, Feb 20, 2021 at 2:14 AM Rex Fenley  wrote:

> Hello,
>
> Using the table api, I have a CREATE DDL which adds a PROCTIME() column
> and I need to use it deep (like 10 operators deep) in our plan. However, I
> explicitly do not want to use it before that point.
>
> The problem I'm running into is for any group by UDAF I use I then lose
> the proctime column. I do not want to group by proctime there (it is
> unwindowed at that point) nor do I want to agg over proctime, I just want
> to pass the proctime column through until later when I actually use it in a
> window.
>
> My question is, how do I forward proctime through group-by UDAFs without
> interfering with my plan?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: WatermarkStrategy.for_bounded_out_of_orderness in table API

2021-02-24 Thread joris.vanagtmaal
Ah thanks, so event though the method of describing it is exactly the same,
because you're using the max resolution it isn't useful for
out-of-orderness. Ok, clear 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Netty LocalTransportException: Sending the partition request to 'null' failed

2021-02-24 Thread Arvid Heise
Hi Matthias,

most of the debug statements are just noise. You can ignore that.

Something with your network seems fishy to me. Either taskmanager 1 cannot
connect to taskmanager 2 (and vice versa), or the taskmanager cannot
connect locally.

I found this fragment, which seems suspicious

Failed to connect to /127.0.*1*.1:32797. Giving up.

localhost is usually 127.0.0.1. Can you double check that you connect from
all machines to all machines (including themselves) by opening trivial text
sockets on random ports?

On Fri, Feb 19, 2021 at 10:59 AM Matthias Seiler <
matthias.sei...@campus.tu-berlin.de> wrote:

> Hi Till,
>
> thanks for the hint, you seem about right. Setting the log level to DEBUG
> reveals more information, but I don't know what to do about it.
>
> All logs throw some Java related exceptions:
> `java.lang.UnsupportedOperationException: Reflective setAccessible(true)
> disabled`
> and
> `java.lang.IllegalAccessException: class
> org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0$6
> cannot access class jdk.internal.misc.Unsafe (in module java.base) because
> module java.base does not export jdk.internal.misc to unnamed module`
>
> The log of node-2's TaskManager reveals connection problems:
> `org.apache.flink.runtime.net.ConnectionUtils [] - Failed
> to connect from address 'node-2/127.0.1.1': Invalid argument (connect
> failed)`
> `java.net.ConnectException: Invalid argument (connect failed)`
>
> What's more, both TaskManagers (node-1 and node-2) are having trouble to
> load `org_apache_flink_shaded_netty4_netty_transport_native_epoll_x86_64`,
> but load some version eventually.
>
>
> There is quite a lot going on here that I don't understand. Can you (or
> someone) shed some light on it and tell me what I could try?
>
> Some more information:
> I appended the following to the `/etc/hosts` file:
> ```
>  node-1
>  node-2
> ```
> And the `flink/conf/workers` consists of:
> ```
> node-1
> node-2
> ```
>
> Thanks,
> Matthias
>
> P.S. I attached the logs for further reference. `` is of course
> the real IP address instead.
>
>
> On 2/16/21 1:56 PM, Till Rohrmann wrote:
>
> Hi Matthias,
>
> Can you make sure that node-1 and node-2 can talk to each other? It looks
> to me that node-2 fails to open a connection to the other TaskManager.
> Maybe the logs give some more insights. You can change the log level to
> DEBUG to gather more information.
>
> Cheers,
> Till
>
> On Tue, Feb 16, 2021 at 10:57 AM Matthias Seiler <
> matthias.sei...@campus.tu-berlin.de> wrote:
>
>> Hi Everyone,
>>
>> I'm trying to setup a Flink cluster in standealone mode with two
>> machines. However, running a job throws the following exception:
>> `org.apache.flink.runtime.io
>> .network.netty.exception.LocalTransportException:
>> Sending the partition request to 'null' failed`
>>
>> Here is some background:
>>
>> Machines:
>> - node-1: JobManager, TaskManager
>> - node-2: TaskManager
>>
>> flink-conf-yaml looks like this:
>> ```
>> jobmanager.rpc.address: node-1
>> taskmanager.numberOfTaskSlots: 8
>> parallelism.default: 2
>> cluster.evenly-spread-out-slots: true
>> ```
>>
>> Deploying the cluster works: I can see both TaskManagers in the WebUI.
>>
>> I ran the streaming WordCount example: `flink run
>> flink-1.12.1/examples/streaming/WordCount.jar --input lorem-ipsum.txt`
>> - the job has been submitted
>> - job failed (with the above exception)
>> - the log of the node-2 also shows the exception, the other logs are
>> fine (graceful stop)
>>
>> I played around with the config and observed that
>> - if parallelism is set to 1, node-1 gets all the slots and node-2 none
>> - if parallelism is set to 2, each TaskManager occupies 1 TaskSlot (but
>> fails because of node-2)
>>
>> I suspect, that the problem must be with the communication between
>> TaskManagers
>> - job runs successful if
>> - node-1 is the **only** node with x TaskManagers (tested with x=1
>> and x=2)
>> - node-2 is the **only** node with x TaskManagers (tested with x=1
>> and x=2)
>> - job fails if
>> - node-1 **and** node-2 have one TaskManager
>>
>> The full exception is:
>> ```
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: 547d4d29b3650883aa403cdb5eb1ba5c)
>> // ... Job failed, Recovery is suppressed by
>> NoRestartBackoffTimeStrategy, ...
>> Caused by:
>> org.apache.flink.runtime.io
>> .network.netty.exception.LocalTransportException:
>> Sending the partition request to 'null' failed.
>> at
>> org.apache.flink.runtime.io
>> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>> at
>> org.apache.flink.runtime.io
>> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>> at
>>
>> 

Re: Kafka SQL Connector: dropping events if more partitions then source tasks

2021-02-24 Thread Arvid Heise
Hi Jan,

Are you running on historic data? Then your partitions might drift apart
quickly.

However, I still suspect that this is a bug (Watermark should only be from
the slowest partition). I'm pulling in Timo who should know more.



On Fri, Feb 19, 2021 at 10:50 AM Jan Oelschlegel <
oelschle...@integration-factory.de> wrote:

> If i increase the watermark, the dropped events getting lower. But why is
> the DataStream API Job still running with 12 hours watermark delay?
>
> By the way, I’m using Flink 1.11. It would be nice if someone could give
> me some advice.
>
>
>
> Best,
>
> Jan
>
>
>
> *Von:* Jan Oelschlegel 
> *Gesendet:* Donnerstag, 18. Februar 2021 09:51
> *An:* Jan Oelschlegel ; user <
> user@flink.apache.org>
> *Betreff:* AW: Kafka SQL Connector: dropping events if more partitions
> then source tasks
>
>
>
> By  using the DataStream API with the same business logic I’m getting no
> dropped events.
>
>
>
> *Von:* Jan Oelschlegel 
> *Gesendet:* Mittwoch, 17. Februar 2021 19:18
> *An:* user 
> *Betreff:* Kafka SQL Connector: dropping events if more partitions then
> source tasks
>
>
>
> Hi,
>
>
>
> i have a question regarding FlinkSQL connector for Kafka. I have 3 Kafka
> partitions and 1 Kafka SQL source connector (Parallelism 1). The data
> within the Kafka parttitons are sorted based on a event-time field, which
> is also my event-time in Flink. My Watermark is generated with a delay of
> 12 hours
>
>
>
> WATERMARK FOR eventtime as eventtime - INTERVAL '12' HOUR
>
>
>
>
>
> But the problem is that I see dropping events due arriving late in
> Prometheus.  But with parallelism of 3  there are no drops.
>
>
>
> Do I always have to have as much source-tasks as I have Kafka partitions?
>
>
>
>
>
>
>
> Best,
>
> Jan
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>


Re: Flink 1.11.3 not able to restore with savepoint taken on Flink 1.9.3

2021-02-24 Thread Arvid Heise
A common pitiful when upgrading a Flink application with savepoints is that
no explicit UIDs have been assigned to the operators. You can amend that by
first adding UIDs to the job in 1.9.3 and create a savepoint with UIDs.
Then try upgrading again.

On Fri, Feb 19, 2021 at 9:57 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I'm not aware of any breaking changes in the savepoint formats from 1.9.3
> to
> 1.11.3.
>
> Let's first try to rule out any obvious causes of this:
> - Were any data types / classes that were used in state changed across the
> restores? Remember that keys types are also written as part of state
> snapshots.
> - Did you register any Kryo types in the 1.9.3 execution, had changed those
> configuration across the restores?
> - Was unaligned checkpointing enabled in the 1.11.3 restore?
>
> As of now it's a bit hard to debug this with just an EOFException, as the
> corrupted read could have happened anywhere before that point. If it's
> possible to reproduce a minimal job of yours that has the same restore
> behaviour, that could also help a lot.
>
> Thanks,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: latency related to the checkpointing mode EXACTLY ONCE

2021-02-24 Thread Arvid Heise
When Flink fails and restarts, it goes back in time to reprocess the data
of the latest checkpoint. That's why it also deleted all uncommitted data
on restart or else you would receive duplicates in your output.
Hence, to get exactly once, you cannot read uncommitted data. That is true
for all streaming systems and sinks that depend on transactions.

In general, low latency and exactly once are contradicting each other a
bit. In Flink, you can only get it in a meaningful way if your
checkpointing interval is very low, which is currently only possible if
your state is very small (no big join windows for example). We are working
on improving that limitation though.

One solution if you need low latency is to drop exactly once and
deduplicate events in your downstream application.
On Fri, Feb 19, 2021 at 9:55 AM Tan, Min  wrote:

> Many thanks for your quick response.
>
>
>
> The config read_commit for the kafka consumers is required by the exactly
> once (EOS)?
>
> No exactly once if we read un committed messages?
>
>
>
> Regards,
>
> Min
>
>
>
> *From:* Chesnay Schepler 
> *Sent:* Thursday, February 18, 2021 8:27 PM
> *To:* Tan, Min ; user 
> *Subject:* [External] Re: latency related to the checkpointing mode
> EXACTLY ONCE
>
>
>
> Yes, if you are only reading committed data than it will take least the
> checkpoint interval for the data to be available to downstream consumers.
>
>
>
> On 2/18/2021 6:17 PM, Tan, Min wrote:
>
> Hi,
>
>
>
> We use the checkpointing mode EXACTLY ONCE for some of our flink jobs.
>
>
>
> I wonder how the checkpoint configurations specially its checkpoint
> interval are related to the end to end latency.
>
>
>
> We need to setup read_commit true for the kafak consumers.
>
>
>
> Does this lead a latency from one flink job is greater than that of
> checkpoint interval?
>
>
>
> Thank you very much for your help in advance.
>
>
>
> Min
>
>
>


Re: How to use ProcessWindowFunction in pyflink?

2021-02-24 Thread Arvid Heise
Hi Hongyuan,

it seems as if PyFlink's datastream API is still lacking window support
[1], which is targeted for next release.

Examples for windows in PyFlink's table API are available here [2].

from pyflink.table.window import Tumblefrom pyflink.table.expressions
import lit, col
orders = t_env.from_path("Orders")result =
orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w"))
\
   .group_by(orders.a, col('w')) \
   .select(orders.a, col('w').start, col('w').end,
orders.b.sum.alias('d'))



[1] https://issues.apache.org/jira/browse/FLINK-21202
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/operations.html#aggregations

On Fri, Feb 19, 2021 at 8:26 AM Hongyuan Ma  wrote:

> Greetings,
>
> I am a newbie to pyflink. I want to be able to use processWindowFunction
> in a Tumble Window, and finally output 0 or more lines. I have checked the
> datastreamAPI and TableAPI of pyflink, but have not found a complete
> example. pyflink's datastream API does not seem to implement window() yet.
> And I'm not sure how to use TableAPI.
>
> If I use java to implement "public class MyProcessWindowFunctextends
> ProcessWindowFunction” and registered as udf in python,
> is it possible to call it through select statement in pyflink? Can the
> select statement correctly return zero or more rows of results?
>
> Any help will be appreciated!
>
> -
> Best Regards,
> Hongyuan Ma
>


Re: WatermarkStrategy.for_bounded_out_of_orderness in table API

2021-02-24 Thread joris.vanagtmaal
Thanks Roman,

somehow i must have missed this in the documentation. 

What is the difference (if any) between:

Ascending timestamps: 
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND.

Bounded out of orderness timestamps: 
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 Thread 林影
请问flink的mongodb connector这块后续有计划吗

Evan  于2021年2月24日周三 下午5:08写道:

> 好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取
>
>
>
>
> 发件人: Paul Lam
> 发送时间: 2021-02-24 17:03
> 收件人: user-zh
> 主题: Re: 社区有人实现过Flink的MongodbSource吗?
> Hi,
>
> Debezium 支持 MongoDB CDC[1],可以了解下。
>
> [1] https://debezium.io/documentation/reference/connectors/mongodb.html
>
> Best,
> Paul Lam
>
> > 2021年2月24日 16:23,Evan  写道:
> >
> >
> > 有人完整的实现Flink的MongodbSource吗
> > 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
> >
> >
>
>


Co-relate two streams

2021-02-24 Thread Abhinav Sharma
Hi,

How can I co-relate two streams of different types in Flink?
Scenario: In stream1, I have data in pojo with a field user. In stream2, I
have data in a different pojo which also contains the field user. (However,
other than the user field, they have no common field).

Now what I want to do is relate the two streams such that for every event
in stream1, I want to collect events from stream2 where the user is the
same. Both stream1 and stream2 are unbounded.

I tried using
stream1.connect(stream2).process(new CoProcessFunction) {
private String user;

public void processElement1(Type1 inp, CoProcessFunction.Context ctx, Collector out)  {
user = inp.getUser();
}

public void processElement2(Type2 inp, CoProcessFunction.Context ctx, Collector out)  {
if (user.equals(inp.getUser())) {
out.collect(inp);
}
}
});

But this works only and only if both elements occur simultaneously.

How can I collect the cases with history? Is using ListState required?
Is there some better way to this in Flink?


Requesting help,
Abhinav


Re: Community chat?

2021-02-24 Thread Sebastián Magrí
I agree with Yuval.

If we wanted to keep chats in the open source world, there's also Matrix
nowadays which works quite well.

On Wed, 24 Feb 2021 at 09:58, Yuval Itzchakov  wrote:

> Both have their place IMO.
>
> There's a lot of value in synchronous communication for which I'd prefer
> Slack or Discord.
> For async communication, I think moving away from mailing lists into
> something like a Discourse forum would be good.
>
> On Wed, Feb 24, 2021 at 11:36 AM Marta Paes Moreira 
> wrote:
>
>> Ah! That freenode channel dates back to...2014? The community is not
>> maintaining any channels other than the Mailing List (and Stack Overflow),
>> currently.
>>
>> But this is something we're looking into, as it's coming up more and more
>> frequently. Would Slack be your first pick? Or would something async but
>> easier to interact with also work, like a Discourse forum?
>>
>> Thanks for bringing this up!
>>
>> Marta
>>
>>
>>
>> On Mon, Feb 22, 2021 at 10:03 PM Yuval Itzchakov 
>> wrote:
>>
>>> A dedicated Slack would be awesome.
>>>
>>> On Mon, Feb 22, 2021, 22:57 Sebastián Magrí 
>>> wrote:
>>>
 Is there any chat from the community?

 I saw the freenode channel but it's pretty dead.

 A lot of the time a more chat alike venue where to discuss stuff
 synchronously or just share ideas turns out very useful and estimulates the
 community.

 --
 Sebastián Ramírez Magrí

>>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


-- 
Sebastián Ramírez Magrí


Re: Object and Integer size in RocksDB ValueState

2021-02-24 Thread Maciej Obuchowski
Hey. Let me send simplified example, because I don't think this
"(given that the actual stored objects (integers) are the same)" is
true - I'm just storing object as a placeholder:

public class DeduplicationProcessFunction extends
KeyedProcessFunction implements CheckpointedFunction {

private transient ValueState processedState;

public DeduplicationProcessFunction() { }

@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception { }

@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
val descriptor = new ValueStateDescriptor<>("processed",
TypeInformation.of(Object.class));
processedState = context.getKeyedStateStore().getState(descriptor);
}

@Override
public void processElement(IN value, Context ctx, Collector
out) throws Exception {
val processed = processedState.value();
if (processed == null) {
processedState.update(new Object());
out.collect(value);
}
}
}



Basically, I'm not sure what rocksdb stores in this case. I'm sure
that it needs to store key, which is 32byte sha key in this case.
What's the value? Is it the 16 bytes that Java requires in-memory? If
I'll change my ValueState to integer, and provide additional value
there, will it require more storage space? Also, to respond to your
point about compression, we're using incremental checkpoints, so I
don't think anything will change as per docs. I'm not only interested
in snapshot size, but also size of current, in memory and local disk
state.

Thanks,
Maciej



wt., 23 lut 2021 o 17:53 Roman Khachatryan  napisał(a):
>
> Hi Maciej,
>
> If I understand correctly, you're asking whether ValueState parameterized 
> with Object has the same size as the one with Integer (given that the actual 
> stored objects (integers) are the same).
> With RocksDB, any state object is serialized first and only then it is stored 
> in MemTable or in an SST file. So it doesn't matter as long as the same 
> serializer is used.
>
> You probably should try enabling compression if you didn't already: 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski 
>  wrote:
>>
>> Hey.
>>
>> We have deduplication job that has a large amount of keyed ValueState. We 
>> want to decrease state size as much as possible, so we're using 
>> ValueState as it's smallest possible Java non-primitive. However, as 
>> per https://www.baeldung.com/java-size-of-object (and my measurements) Java 
>> Integer has the same memory size as Object due to padding.
>> Will this still be true with RocksDB state? Can we put Integer in state 
>> without increasing state size?
>>
>> Thanks, Maciej


Re: Community chat?

2021-02-24 Thread Yuval Itzchakov
Both have their place IMO.

There's a lot of value in synchronous communication for which I'd prefer
Slack or Discord.
For async communication, I think moving away from mailing lists into
something like a Discourse forum would be good.

On Wed, Feb 24, 2021 at 11:36 AM Marta Paes Moreira 
wrote:

> Ah! That freenode channel dates back to...2014? The community is not
> maintaining any channels other than the Mailing List (and Stack Overflow),
> currently.
>
> But this is something we're looking into, as it's coming up more and more
> frequently. Would Slack be your first pick? Or would something async but
> easier to interact with also work, like a Discourse forum?
>
> Thanks for bringing this up!
>
> Marta
>
>
>
> On Mon, Feb 22, 2021 at 10:03 PM Yuval Itzchakov 
> wrote:
>
>> A dedicated Slack would be awesome.
>>
>> On Mon, Feb 22, 2021, 22:57 Sebastián Magrí  wrote:
>>
>>> Is there any chat from the community?
>>>
>>> I saw the freenode channel but it's pretty dead.
>>>
>>> A lot of the time a more chat alike venue where to discuss stuff
>>> synchronously or just share ideas turns out very useful and estimulates the
>>> community.
>>>
>>> --
>>> Sebastián Ramírez Magrí
>>>
>>

-- 
Best Regards,
Yuval Itzchakov.


Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

2021-02-24 Thread Qishang
Hi .
我也遇到了这个问题, 最后怎么解决的? 版本 Flink 1.12.1 .

flink2021  于2021年2月19日周五 下午12:39写道:

> 嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢?
> JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC
> -XX:MaxDirectMemorySize=8192m"
> 其它也就是写常规的配置:
> og.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> #broker能接收消息的最大字节数
> message.max.bytes=2
> #broker可复制的消息的最大字节数
> replica.fetch.max.bytes=204857600
> #消费者端的可读取的最大消息
> fetch.message.max.bytes=204857600
> max.poll.records=500
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


[UPDATE] Release 1.13 feature freeze

2021-02-24 Thread Dawid Wysakowicz
Hi all,

The agreed date of a feature freeze is due in about a month. Therefore
we thought it would be a good time to give an update of the current
progress.

From the information we gathered there are currently no known obstacles
or foreseeable delays. We are still aiming for the end of March as the
date for a feature freeze.

Some update on the progress of particular issues. Most of the issues
listed in the https://cwiki.apache.org/confluence/x/jAhRCg are well on
track. We will list only the issues that we feel might not end up in the
release or end up there partially:

  * Tolerate temporarily suspended ZooKeeper connections (FLINK-10052)
  * Support shipping local/remote files for yarn/k8s
integrations(FLINK-20681 / FLINK-20811 / FLINK-20867)
  * Consistent Flink SQL time function behavior (FLIP-162)

These are long lasting efforts and are planned to be delivered based on
best effort, therefore they might not be fully functional in 1.13:

  * Generalized incremental checkpoints
  * Incremental snapshots for heap-based state backend

We have 4 release blockers as of today:

Checkpointing:

  * https://issues.apache.org/jira/browse/FLINK-21453
BoundedOneInput.endInput is NOT called when doing stop with
savepoint WITH drain

Dependencies:

  * https://issues.apache.org/jira/browse/FLINK-21152 Bump flink-shaded
to 13.0

Coordination:

  * https://issues.apache.org/jira/browse/FLINK-21030 Broken job restart
for job with disjoint graph

Connectors:

  * https://issues.apache.org/jira/browse/FLINK-20188 Add Documentation
for new File Source

We have 100 test instabilities across all versions out of which 37
occurred on master (1.13) branch.

Let me know if we made a mistake somewhere or something is inaccurate.
Moreover everyone is welcome to give more thorough update if you feel
like so.

Your release managers

Guowei & Dawid




OpenPGP_signature
Description: OpenPGP digital signature


Re: Community chat?

2021-02-24 Thread Marta Paes Moreira
Ah! That freenode channel dates back to...2014? The community is not
maintaining any channels other than the Mailing List (and Stack Overflow),
currently.

But this is something we're looking into, as it's coming up more and more
frequently. Would Slack be your first pick? Or would something async but
easier to interact with also work, like a Discourse forum?

Thanks for bringing this up!

Marta



On Mon, Feb 22, 2021 at 10:03 PM Yuval Itzchakov  wrote:

> A dedicated Slack would be awesome.
>
> On Mon, Feb 22, 2021, 22:57 Sebastián Magrí  wrote:
>
>> Is there any chat from the community?
>>
>> I saw the freenode channel but it's pretty dead.
>>
>> A lot of the time a more chat alike venue where to discuss stuff
>> synchronously or just share ideas turns out very useful and estimulates the
>> community.
>>
>> --
>> Sebastián Ramírez Magrí
>>
>


Re: Window Process function is not getting trigger

2021-02-24 Thread Kezhu Wang
Try `env.setParallelism(1)`. Default parallelism for local environment is
`Runtime.getRuntime.availableProcessors`.

You test data set are so small that when they are scatter cross multiple
parallel instances, there will be no data with event time assigned to
trigger downstream computation.

Or you could try `WatermarkStrategy.withIdleness`.


Best,
Kezhu Wang

On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote:

It is fairly simple requirement, if I changed it to PRocessing time it
works fine , but not working with event time..help appreciated!

On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:

> HI
>
> Corrected with below code, but still getting same issue
>
> Instant instant = 
> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
> long timeInMillis = instant.toEpochMilli();
> System.out.println(timeInMillis);
> return timeInMillis;
>
>
> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>
>> I saw one potential issue. Your timestamp assigner returns timestamp in
>> second resolution while Flink requires millisecond resolution.
>>
>>
>> Best,
>> Kezhu Wang
>>
>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>
>> I have simple flink stream program, where I am using socket as my
>> continuous source
>> I have window size of 2 seconds.
>>
>> Somehow my window process function is not triggering and even if I pass
>> events in any order, flink is not ignoring
>>
>> I can see the output only when I kill my socket , please find the code
>> snippet below
>>
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>>
>> DataStream price = env.socketTextStream("localhost",
>> 9998).uid("price source").map(new MapFunction() {
>> @Override
>> public Price map(String s) throws Exception {
>> return new Price(s.split(",")[0],
>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>> }
>> }
>> );
>>
>> DataStream priceStream = price
>>
>>  
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>> .withTimestampAssigner((p,timestamp) ->
>> {
>> ZoneId zoneId = ZoneId.systemDefault();
>> long epoch =
>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>> System.out.println(epoch);
>>  return epoch;
>> }))
>> .keyBy(new KeySelector() {
>> @Override
>> public String getKey(Price price) throws Exception {
>> return price.getPerformanceId();
>> }
>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>> .process(new ProcessWindowFunction> TimeWindow>() {
>>
>> @Override
>> public void process(String s, Context context,
>> Iterable iterable, Collector collector) throws Exception {
>> System.out.println(context.window().getStart()+
>> "Current watermark: "+context.window().getEnd());
>> Price p1 = null ;
>> for(Price p : iterable)
>> {
>> System.out.println(p.toString());
>> p1= p;
>> }
>> collector.collect(p1);
>> }
>> });
>>
>>
>> priceStream.writeAsText("c:\\ab.txt");
>>
>> also data I am inputting are
>>
>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>


-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 Thread Evan
好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取



 
发件人: Paul Lam
发送时间: 2021-02-24 17:03
收件人: user-zh
主题: Re: 社区有人实现过Flink的MongodbSource吗?
Hi,
 
Debezium 支持 MongoDB CDC[1],可以了解下。
 
[1] https://debezium.io/documentation/reference/connectors/mongodb.html
 
Best,
Paul Lam
 
> 2021年2月24日 16:23,Evan  写道:
> 
> 
> 有人完整的实现Flink的MongodbSource吗
> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
> 
> 
 


Re: Flink On Yarn Per Job 作业提交失败问题

2021-02-24 Thread Robin Zhang
Hi,凌战
看看hadoop环境变量是否正确设置,可以参考文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#preparation

Best,
Robin


凌战 wrote
> hi,社区
> 在接口端设置用户为 hdfs 用户,在调度执行作业后,发现在/user/hdfs/.flink/application-id 目录下 存在相关包,如
> -rw-r--r--   3 hdfs supergroup   9402 2021-02-24 11:02
> /user/hdfs/.flink/application_1610671284452_0257/WordCount.jar
> -rw-r--r--   3 hdfs supergroup   1602 2021-02-24 11:09
> /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_0257-flink-conf.yaml7449295579763306480.tmp
> -rw-r--r--   3 hdfs supergroup  32629 2021-02-24 11:09 
> /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_02573784840919107496826.tmp
> -rw-r--r--   3 hdfs supergroup  110075001 2021-02-24 11:09
> /user/hdfs/.flink/application_1610671284452_0257/flink-dist_2.11-1.10.1.jar
> 
> 
> 但是报错 Could not find or load main class
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
> 发现上传文件目录的权限是  -rw-r--r-- ,不知道是不是因为权限问题导致
> 
> 
> 希望有人解惑!
> | |
> 凌战
> |
> |

> m18340872285@

> |
> 签名由网易邮箱大师定制





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 Thread Paul Lam
Hi,

Debezium 支持 MongoDB CDC[1],可以了解下。

[1] https://debezium.io/documentation/reference/connectors/mongodb.html

Best,
Paul Lam

> 2021年2月24日 16:23,Evan  写道:
> 
> 
> 有人完整的实现Flink的MongodbSource吗
> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
> 
> 



Re: Does the Kafka source perform retractions on Key?

2021-02-24 Thread Jan Lukavský

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka 
source in the case of compacted topic, right? If that is the case, then 
this is not directly related to Flink, Flink will expose the behavior 
defined by Kafka. You can read about it for instance here [1]. TL;TD - 
your pipeline is guaranteed to see every record written to topic (every 
single update, be it later "overwritten" or not) if it processes the 
record with latency at most 'delete.retention.ms'. This is configurable 
per topic - default 24 hours. If you want to reprocess the data later, 
your consumer might see only resulting compacted ("retracted") stream, 
and not every record actually written to the topic.


 Jan

[1] 
https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262


On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own 
retractions of old data on key (user_id) for every append it receives, 
it should resolve this discrepancy I think.


Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley > wrote:


Hi,

I'm concerned about the impacts of Kafka's compactions when
sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up
compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state ==
"california" and reads from the Kafka stream, I assume it will
miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to
use compaction since all our jobs are based on CDC and we can't
just drop data after x number of days.

Thanks

-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US




--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 





Re: [Statefun] Dynamic behavior

2021-02-24 Thread Miguel Araújo
Thanks Seth.
I understood Igal's suggestion. My concern was about maintaining a separate
service (outside flink/statefun) when this control stream might be an
incremental stream as well (think, rules in fraud detection - although this
is not a fraud detection application, but the example is good). I wouldn't
want to implement fault tolerance, checkpointing, HA, etc. myself.
I now see that I wasn't thinking a step ahead - just because it is a
separate service from statefun's point of view, it doesn't mean it can't be
implemented in flink if it turns out to be the most appropriate tool.

Thanks for all suggestions, this was definitely helpful.

Miguel

Seth Wiesman  escreveu no dia terça, 23/02/2021 à(s)
17:08:

> I don't think there is anything statefun specific here and I would follow
> Igals advice.
>
> Let's say you have a state value called `Behavior` that describes the
> behavior of an instance. There is a default behavior but any given instance
> may have a customized behavior. What I would do is the following.
>
> Create a state in the TransactionManager called `behavior` that stores the
> instance's customized behavior if it exists. When a transaction comes in,
> read the behavior state. If it exists (is not None in the case of Python)
> then use that. If not, then fall back to the default instance.
>
> The default instance can be provided one of several ways depending on the
> specifics of your use case:
>
> 1) hard-coded in the function.
> 2) dynamically loaded via a background thread as a global. so long as that
> default is immutable this is safe
> 3) dynamically loaded via the function instance on first use. stateful
> functions have strong support for making async requests so you could simply
> query the behavior for that instance on first use from a 3rd party service.
>
> Seth
>
>
> On Tue, Feb 23, 2021 at 10:55 AM Miguel Araújo 
> wrote:
>
>> Hi Seth,
>>
>> Thanks for your comment. I've seen that repository in the past and it was
>> really helpful to "validate" that this was the way to go.
>> I think my question is not being addressed there though: how could one
>> add dynamic behavior to your TransactionManager? In this case, state that
>> is available to all TransactionManager instances when they receive a
>> message of type Transaction for the first time.
>>
>> Seth Wiesman  escreveu no dia terça, 23/02/2021
>> à(s) 16:02:
>>
>>> Hey Miguel,
>>>
>>> What you are describing is exactly what is implemented in this repo. The
>>> TransactionManager function acts as an orchestrator to work with the other
>>> functions. The repo is structured as an exercise but the full solution
>>> exists on the branch `advanced-solution`.
>>>
>>> https://github.com/ververica/flink-statefun-workshop
>>>
>>> On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo 
>>> wrote:
>>>
 Another possibility I am considering is handling this in Flink using a
 broadcast and adding all the information needed to the event itself. I'm a
 little concerned about the amount of data that will be serialized and sent
 on every request though, as I'll need to include information about all
 available remote functions, for instance.

 Miguel Araújo  escreveu no dia terça, 23/02/2021
 à(s) 09:14:

> Hi Gordon, Igal,
>
> Thanks for your replies.
> PubSub would be a good addition, I have a few scenarios where that
> would be useful.
>
> However, after reading your answers I realized that your proposed
> solutions (which address the most obvious interpretation of my question) 
> do
> not necessarily solve my problem. I should have just stated what it was,
> instead of trying to propose a solution by discussing broadcast...
>
> I'm trying to implement an "orchestrator" function which, given an
> event, will trigger multiple remote function calls, aggregate their 
> results
> and eventually call yet more functions (based on a provided dependency
> graph). Hence, this orchestrator function has state per event_id and each
> function instance is short-lived (a couple seconds at most, ideally
> sub-second). The question then is not about how to modify a long-running
> function instance (which PubSub would enable), but rather how to have the
> dependency graph available to new functions.
>
> Given this, Igal's answer seems promising because we have the
> FunctionProvider instantiating a local variable and passing it down on
> every instantiation. I'm assuming there is one FunctionProvider per
> TaskManager. Is there an easy way to have the FunctionProvider receiving
> data coming from a Flink DataStream, or receiving StateFun messages?
> Otherwise, I could have it subscribe to a Kafka topic directly.
>
> I really appreciate your help.
>
> Miguel
>
> Igal Shilman  escreveu no dia segunda, 22/02/2021
> à(s) 12:09:
>
>> Hi Miguel,
>>
>> I think that there are a couple 

社区有人实现过Flink的MongodbSource吗?

2021-02-24 Thread Evan

有人完整的实现Flink的MongodbSource吗
如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化