broadcast() without arguments

2021-12-09 Thread Alexey Trenikhun
Hello,
How broadcast()​ method without arguments should be used ?

Thanks,
Alexey


stateSerializer(1.14.0) not compatible with previous stateSerializer(1.13.1)

2021-12-09 Thread 李诗君
I am trying to upgrade my flink cluster version from 1.13.1 to 1.14.0 , I did 
like below steps:

1. savepoint running tasks in version1.13.1
2. stop tasks and upgrade cluster version to 1.14.0
3. recover tasks with savepoints

and this happened:


java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119)
 ~[flink-table_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
 ~[flink-fsp-connector-rksc-1.0-SNAPSHOT.jar:?]
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]
Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer 
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a508b39e) 
must not be incompatible with the old state serializer 
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a508b39e).
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
... 15 more

退订

2021-12-09 Thread 稻草人



Re: Running a flink job with Yarn per-job mode from application code.

2021-12-09 Thread Caizhi Weng
Hi!

For Yarn jobs, Flink web UI (which shares the same port with REST API) can
be found by clicking into the "Application Manager" link in the
corresponding application page. I'm not familiar with Yarn API but I guess
there is some way to get this application manager link.

Kamil ty  于2021年12月9日周四 23:07写道:

> Sorry for that. Will remember to add the mailing list in responses.
>
> The REST API approach will probably be sufficient. One more question
> regarding this. Is it possible to get the address/port of the rest api
> endpoint from the job? I see that when deploying a job to yarn the
> rest.address and rest.port keys are not set inside the configuration
> obtained with env.getConfiguration().toString();.
>
> On Thu, 9 Dec 2021 at 03:29, Caizhi Weng  wrote:
>
>> Hi!
>>
>> Please make sure to always reply to the user mailing list so that
>> everyone can see the discussion.
>>
>> You can't get the execution environment for an already running job but if
>> you want to operate on that job you can try to get its JobClient instead.
>> However this is somewhat complicated to get with Java code. If you're
>> interested see ClusterClientJobClientAdapter and its usage.
>>
>> I would also recommend using REST API to operate on an existing job. See
>> [1] for the APIs.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
>>
>> Kamil ty  于2021年12月8日周三 21:15写道:
>>
>>> Thank you. I guess I will try this solution. One thing I'm unsure about
>>> is if I will be able to get the execution environment in a function from an
>>> already running job. Will get back with an response if this works.
>>>
>>> Best regards
>>> Kamil
>>>
>>> On Wed, 8 Dec 2021, 03:20 Caizhi Weng,  wrote:
>>>
 Hi!

 So you would like to submit a yarn job with Java code, not using
 /bin/flink run?

 If that is the case, you'll need to set 'execution.target' config
 option to 'yarn-per-job'. Set this in the configuration of
 ExecutionEnvironment and execute the job with Flink API as normal.

 Kamil ty  于2021年12月7日周二 19:16写道:

> Hello all,
>
> I'm looking for a way to submit a Yarn job from another flink jobs
> application code. I can see that you can access a cluster and submit jobs
> with a RestClusterClient, but it seems a Yarn per-job mode is not 
> supported
> with it.
>
> Any suggestions would be appreciated.
>
> Best Regards
> Kamil
>



Re: Stateful functions - egress question

2021-12-09 Thread Gigio Topo


Hi,

Following your indications, it makes totally sense to use Kafka as egress and 
add a stateless bulk importer on top of that.

Thanks for the clarification.

M.


> Unfortunately we don't support remote egress right now. If you really want to 
> avoid embed modules,
> and you are using Kafka/Kinesis for ingress, then perhaps you can use 
> Kafka/Kinesis for egress as-well,
> then write a simple almost stateless bulk importer that takes bulk insert 
> commands out of Kafka (or Kinesis) and
> bulk inserts them to your database.









Re: How to select an event that has max attribute

2021-12-09 Thread Guoqin Zheng
Hi Jing,

Thanks for the advice. This is very helpful.

-Guoqin

On Wed, Dec 8, 2021 at 11:52 PM Jing Zhang  wrote:

> Hi Guoqin,
> I understand the problem you are suffering.
> I'm sorry I could not find out a perfect solution on Flink 1.13.
>
> Maybe you could try to use TopN [1] instead of Window TopN by normalizing
> time into a unit with 5 minute, and add it to be one of partition keys.
> But the result is an update stream instead of append stream, which means
> the result sent might be retracted later. Besides, you could take care of
> state clean.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/topn/
>
> Best,
> Jing Zhang
>
> Guoqin Zheng  于2021年12月9日周四 14:16写道:
>
>> Hi Jing,
>>
>> Just verified that it worked with Flink 1.14. But as you said, Flink 1.13
>> does not yet support it.
>> Other than waiting for KDA to upgrade the Flink version, is there any
>> workaround for Flink 1.13?
>>
>> Thanks,
>> -Guoqin
>>
>> On Wed, Dec 8, 2021 at 10:00 PM Guoqin Zheng 
>> wrote:
>>
>>> Hi Jing,
>>>
>>> Thanks for chiming in. This sounds great. Any chance this will work for
>>> Flink 1.13 as well, as I am using AWS KDA.
>>>
>>> Thanks,
>>> -Guoqin
>>>
>>> On Wed, Dec 8, 2021 at 7:47 PM Jing Zhang  wrote:
>>>
 Hi Guoqin,
 I guess you have misunderstood Martijn's response.
 Martijn suggest you use Window TopN. Besides, Window TopN does not need
 to follow a Window Aggregate, it could followed with Window TVF directly
 since Flink 1.14. Please see document [1] attached.
 You could try the following SQL to get the record with the max gauge
 because you use ORDER BY gauge desc and fetch the first one.

 SELECT deviceId, locationId, gauge, window_start, window_end
   FROM (
 SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end 
 ORDER BY gauge DESC) as rownum
 FROM TABLE(
TUMBLE(TABLE MyTable, DESCRIPTOR(readtime), INTERVAL '5' 
 MINUTES))
   ) WHERE rownum <= 1;

 [1]
 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-topn/#window-top-n-follows-after-windowing-tvf

 Best,
 Jing Zhang




 Guoqin Zheng  于2021年12月9日周四 10:30写道:

> Hi Martijn,
>
> Thanks for your quick response. I tried it, but it does not seem to
> work.
>
> The problem is that I want to select fields that are not in the `GROUP
> BY`. So in my example, I can have a tumble window on `readtime`, and 
> select
> max(gauge), but I also want both `deviceId` and `locationId` of the max
> record included in the selected result. With Top-N, it does not seem to
> allow that.
>
> -Guoqin
>
> On Wed, Dec 8, 2021 at 1:22 PM Martijn Visser 
> wrote:
>
>> Hi Guoqin,
>>
>> I think you could use the Window Top-N. There's a recipe in the Flink
>> SQL Cookbook [1]. The example uses a SUM which you should change to MAX 
>> and
>> of course you change the rownum to 1 instead of 3.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>>
>> https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/11_window_top_n/11_window_top_n.md
>>
>> Op wo 8 dec. 2021 om 19:54 schreef Guoqin Zheng <
>> lanson.zh...@gmail.com>
>>
>>> Hi Flink Community,
>>>
>>> I am curious what the recommended way is to select the event with a
>>> max attribute value with SQL api.
>>>
>>> For example, I have an event stream like:
>>>
>>> {
>>>deviceId,
>>>locationId
>>>gauge,
>>>readtime,  <-- eventTime
>>> }
>>>
>>> I want to figure out which device and location has the max gauge
>>> over a 5-mins window.
>>>
>>> Any advice would be greatly appreciated!
>>>
>>> Thanks!
>>> -Guoqin
>>>
>> --
>>
>> Martijn Visser | Product Manager
>>
>> mart...@ververica.com
>>
>> 
>>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>>


Re: [DISCUSS] Deprecate MapR FS

2021-12-09 Thread David Morávek
+1, agreed with Seth's reasoning. There has been no real activity in MapR
FS module for years [1], so the eventual users should be good with using
the jars from the older Flink versions for quite some time

[1]
https://github.com/apache/flink/commits/master/flink-filesystems/flink-mapr-fs

Best,
D.

On Thu, Dec 9, 2021 at 4:28 PM Konstantin Knauf  wrote:

> +1 (what Seth said)
>
> On Thu, Dec 9, 2021 at 4:15 PM Seth Wiesman  wrote:
>
> > +1
> >
> > I actually thought we had already dropped this FS. If anyone is still
> > relying on it in production, the file system abstraction in Flink has
> been
> > incredibly stable over the years. They should be able to use the 1.14
> MapR
> > FS with later versions of Flink.
> >
> > Seth
> >
> > On Wed, Dec 8, 2021 at 10:03 AM Martijn Visser 
> > wrote:
> >
> >> Hi all,
> >>
> >> Flink supports multiple file systems [1] which includes MapR FS. MapR as
> >> a company doesn't exist anymore since 2019, the technology and
> intellectual
> >> property has been sold to Hewlett Packard.
> >>
> >> I don't think that there's anyone who's using MapR anymore and therefore
> >> I think it would be good to deprecate this for Flink 1.15 and then
> remove
> >> it in Flink 1.16. Removing this from Flink will slightly shrink the
> >> codebase and CI runtime.
> >>
> >> I'm also cross posting this to the User mailing list, in case there's
> >> still anyone who's using MapR.
> >>
> >> Best regards,
> >>
> >> Martijn
> >>
> >> [1]
> >>
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
> >>
> >
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Re: [DISCUSS] Deprecate MapR FS

2021-12-09 Thread Konstantin Knauf
+1 (what Seth said)

On Thu, Dec 9, 2021 at 4:15 PM Seth Wiesman  wrote:

> +1
>
> I actually thought we had already dropped this FS. If anyone is still
> relying on it in production, the file system abstraction in Flink has been
> incredibly stable over the years. They should be able to use the 1.14 MapR
> FS with later versions of Flink.
>
> Seth
>
> On Wed, Dec 8, 2021 at 10:03 AM Martijn Visser 
> wrote:
>
>> Hi all,
>>
>> Flink supports multiple file systems [1] which includes MapR FS. MapR as
>> a company doesn't exist anymore since 2019, the technology and intellectual
>> property has been sold to Hewlett Packard.
>>
>> I don't think that there's anyone who's using MapR anymore and therefore
>> I think it would be good to deprecate this for Flink 1.15 and then remove
>> it in Flink 1.16. Removing this from Flink will slightly shrink the
>> codebase and CI runtime.
>>
>> I'm also cross posting this to the User mailing list, in case there's
>> still anyone who's using MapR.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
>>
>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: [DISCUSS] Deprecate MapR FS

2021-12-09 Thread Seth Wiesman
+1

I actually thought we had already dropped this FS. If anyone is still
relying on it in production, the file system abstraction in Flink has been
incredibly stable over the years. They should be able to use the 1.14 MapR
FS with later versions of Flink.

Seth

On Wed, Dec 8, 2021 at 10:03 AM Martijn Visser 
wrote:

> Hi all,
>
> Flink supports multiple file systems [1] which includes MapR FS. MapR as a
> company doesn't exist anymore since 2019, the technology and intellectual
> property has been sold to Hewlett Packard.
>
> I don't think that there's anyone who's using MapR anymore and therefore I
> think it would be good to deprecate this for Flink 1.15 and then remove it
> in Flink 1.16. Removing this from Flink will slightly shrink the codebase
> and CI runtime.
>
> I'm also cross posting this to the User mailing list, in case there's
> still anyone who's using MapR.
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
>


Re: Running a flink job with Yarn per-job mode from application code.

2021-12-09 Thread Kamil ty
Sorry for that. Will remember to add the mailing list in responses.

The REST API approach will probably be sufficient. One more question
regarding this. Is it possible to get the address/port of the rest api
endpoint from the job? I see that when deploying a job to yarn the
rest.address and rest.port keys are not set inside the configuration
obtained with env.getConfiguration().toString();.

On Thu, 9 Dec 2021 at 03:29, Caizhi Weng  wrote:

> Hi!
>
> Please make sure to always reply to the user mailing list so that everyone
> can see the discussion.
>
> You can't get the execution environment for an already running job but if
> you want to operate on that job you can try to get its JobClient instead.
> However this is somewhat complicated to get with Java code. If you're
> interested see ClusterClientJobClientAdapter and its usage.
>
> I would also recommend using REST API to operate on an existing job. See
> [1] for the APIs.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
>
> Kamil ty  于2021年12月8日周三 21:15写道:
>
>> Thank you. I guess I will try this solution. One thing I'm unsure about
>> is if I will be able to get the execution environment in a function from an
>> already running job. Will get back with an response if this works.
>>
>> Best regards
>> Kamil
>>
>> On Wed, 8 Dec 2021, 03:20 Caizhi Weng,  wrote:
>>
>>> Hi!
>>>
>>> So you would like to submit a yarn job with Java code, not using
>>> /bin/flink run?
>>>
>>> If that is the case, you'll need to set 'execution.target' config option
>>> to 'yarn-per-job'. Set this in the configuration of ExecutionEnvironment
>>> and execute the job with Flink API as normal.
>>>
>>> Kamil ty  于2021年12月7日周二 19:16写道:
>>>
 Hello all,

 I'm looking for a way to submit a Yarn job from another flink jobs
 application code. I can see that you can access a cluster and submit jobs
 with a RestClusterClient, but it seems a Yarn per-job mode is not supported
 with it.

 Any suggestions would be appreciated.

 Best Regards
 Kamil

>>>


Re: Stateful functions module configurations (module.yaml) per deployment environment

2021-12-09 Thread Igal Shilman
Hello Deniz,

Looking at /flink/usrlib and the way it is expected to be used, Flink will
only pick up .jar files and include
them into the classpath, so unfortunately it is being excluded.
If you want it to just make it work and get on with your day, you can
simply place module.yaml in a separate JAR , otherwise keep on reading :-)

I've created a branch[1], that supports providing a custom name for the
module.yaml, if you are comfortable with building this branch and trying it
out, I can go forward with adding this to statefun, as I believe others
might need a similar functionality.

To make it work you need:

1. Use @Fabian Paul 's advice and upload your
custom module.yaml as you did before, you can also rename it now to
whatever name you want. For example prod.yaml.
2. This file will appear at /flink/usrlib/prod.yaml
3. You would also need to specify this file by adding the following to your
flink-conf.yaml:

statefun.remote.module-name: /flink/usrlib/prod.yaml

4. At the bottom of this page [2] you can see a full example, and how to
add additional flink configurations (flinkConfiguration) property.

I hope this helps,
Igal.

[1] https://github.com/igalshilman/flink-statefun/tree/custom_module
[2]
https://docs.ververica.com/user_guide/application_operations/deployments/index.html#deployment-defaults


On Thu, Dec 9, 2021 at 12:22 PM Deniz Koçak  wrote:

> Hi Fabian,
>
> Thanks for that solution.. I've removed the module.yaml file from the
> jar file assuming that it should be fetched from s3 and used by the
> job. I've tried this on our job, but still its seems to be failing.
>
> From the logs module.yaml file seems to be fetched from s3 bucket.
> 
> com.ververica.platform.artifactfetcher.ArtifactFetcherEntryPoint -
> Finished fetching
> s3://rttk8s-nxt-v2/vvp/artifacts/namespaces/default/module.yaml into
> /flink/usrlib/module.yaml
> 
>
> However we got that exception below:
>
> 
> Caused by: java.lang.IllegalStateException: There are no ingress
> defined. at
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
> ~[?:?]
> 
>
> Please let me know if you need further information. Thanks again for your
> help.
>
> Deniz
>
> On Wed, Dec 8, 2021 at 1:20 PM Fabian Paul  wrote:
> >
> > Hi Deniz,
> >
> > Great to hear from someone using Ververica Platform with StateFun.
> > When deploying your job you can specify `additionalConfigurations`[1]
> > that are also pulled and put into the classpath.
> >
> > Hopefully, that is suitable for your scenario.
> >
> > Best,
> > Fabian
> >
> > [1]
> https://docs.ververica.com/user_guide/application_operations/deployments/artifacts.html?highlight=additionaldependencies
> >
> > On Fri, Dec 3, 2021 at 4:51 PM Deniz Koçak  wrote:
> > >
> > > Hi Igal,
> > >
> > > We are using official images from Ververica as the Flink installation.
> > > Actually, I was hoping to specify the name of file names to use during
> > > the runtime via `mainArgs` in the deployment configuration (or any
> > > other way may be). By this way we can specify the target yaml files,
> > > but I think this is not possible?
> > >
> > > ===
> > > kind: JAR
> > > mainArgs: '--active-profile nxt'
> > > ===
> > >
> > > Therefore, it's easier to use single jar in our pipelines instead of
> > > creating a different jar file for each env. (at least for development
> > > and production).
> > >
> > > For solution 2, you refer flink distro. , like /flink/lib folder in
> > > the official Docker image?
> > >
> > > Thanks,
> > > Deniz
> > >
> > > On Fri, Dec 3, 2021 at 3:06 PM Igal Shilman  wrote:
> > > >
> > > > Hi Deniz,
> > > >
> > > > StateFun would be looking for module.yaml(s) in the classpath.
> > > > If you are submitting the job to an existing Flink cluster this
> really means that it needs to be either:
> > > > 1. packaged with the jar (like you are already doing)
> > > > 2. be present at the classpath, this means that you can place your
> module.yaml at the /lib directory of your Flink installation, I suppose
> that you have different installations in different environments.
> > > >
> > > > I'm not aware of a way to submit any additional files with the jar
> via the flink cli, but perhaps someone else can chime in :-)
> > > >
> > > > Cheers,
> > > > Igal.
> > > >
> > > >
> > > > On Thu, Dec 2, 2021 at 3:29 PM Deniz Koçak 
> wrote:
> > > >>
> > > >> Hi,
> > > >>
> > > >> We have a simple stateful-function job, consuming from Kafka,
> calling
> > > >> an HTTP endpoint (on AWS via an Elastic Load Balancer) and
> publishing
> > > >> the result back via Kafka again.
> > > >>
> > > >> * We created a jar file to be deployed on a standalone cluster (it's
> > > >> not a docker Image), therefore we add `statefun-flink-distribution`
> > > >> version 3.0.0 as a dependency in that jar file.
> > > >> * Entry class in our job configuration is
> > > >> 

Hybrid Source with Parquet Files from GCS + KafkaSource

2021-12-09 Thread Meghajit Mazumdar
Hello,

We have a requirement as follows:

We want to stream events from 2 sources: Parquet files stored in a GCS
Bucket, and a Kafka topic.
With the release of Hybrid Source in Flink 1.14, we were able to construct
a Hybrid Source which produces events from two sources: a FileSource
which reads data from a locally saved Parquet File, and a KafkaSource
consuming events from a remote Kafka broker.

I was wondering if instead of using a local Parquet file, whether it is
possible to directly stream the file from a GCS bucket and construct a File
Source out of it at runtime ? The Parquet Files are quite big and it's a
bit expensive to download.

Does Flink have such a functionality ? Or, has anyone come across such a
use case previously ? Would greatly appreciate some help on this.

Looking forward to hearing from you.

Thanks,
Megh


Re: Stateful functions module configurations (module.yaml) per deployment environment

2021-12-09 Thread Deniz Koçak
Hi Fabian,

Thanks for that solution.. I've removed the module.yaml file from the
jar file assuming that it should be fetched from s3 and used by the
job. I've tried this on our job, but still its seems to be failing.

>From the logs module.yaml file seems to be fetched from s3 bucket.

com.ververica.platform.artifactfetcher.ArtifactFetcherEntryPoint -
Finished fetching
s3://rttk8s-nxt-v2/vvp/artifacts/namespaces/default/module.yaml into
/flink/usrlib/module.yaml


However we got that exception below:


Caused by: java.lang.IllegalStateException: There are no ingress
defined. at 
org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
~[?:?]


Please let me know if you need further information. Thanks again for your help.

Deniz

On Wed, Dec 8, 2021 at 1:20 PM Fabian Paul  wrote:
>
> Hi Deniz,
>
> Great to hear from someone using Ververica Platform with StateFun.
> When deploying your job you can specify `additionalConfigurations`[1]
> that are also pulled and put into the classpath.
>
> Hopefully, that is suitable for your scenario.
>
> Best,
> Fabian
>
> [1] 
> https://docs.ververica.com/user_guide/application_operations/deployments/artifacts.html?highlight=additionaldependencies
>
> On Fri, Dec 3, 2021 at 4:51 PM Deniz Koçak  wrote:
> >
> > Hi Igal,
> >
> > We are using official images from Ververica as the Flink installation.
> > Actually, I was hoping to specify the name of file names to use during
> > the runtime via `mainArgs` in the deployment configuration (or any
> > other way may be). By this way we can specify the target yaml files,
> > but I think this is not possible?
> >
> > ===
> > kind: JAR
> > mainArgs: '--active-profile nxt'
> > ===
> >
> > Therefore, it's easier to use single jar in our pipelines instead of
> > creating a different jar file for each env. (at least for development
> > and production).
> >
> > For solution 2, you refer flink distro. , like /flink/lib folder in
> > the official Docker image?
> >
> > Thanks,
> > Deniz
> >
> > On Fri, Dec 3, 2021 at 3:06 PM Igal Shilman  wrote:
> > >
> > > Hi Deniz,
> > >
> > > StateFun would be looking for module.yaml(s) in the classpath.
> > > If you are submitting the job to an existing Flink cluster this really 
> > > means that it needs to be either:
> > > 1. packaged with the jar (like you are already doing)
> > > 2. be present at the classpath, this means that you can place your 
> > > module.yaml at the /lib directory of your Flink installation, I suppose 
> > > that you have different installations in different environments.
> > >
> > > I'm not aware of a way to submit any additional files with the jar via 
> > > the flink cli, but perhaps someone else can chime in :-)
> > >
> > > Cheers,
> > > Igal.
> > >
> > >
> > > On Thu, Dec 2, 2021 at 3:29 PM Deniz Koçak  wrote:
> > >>
> > >> Hi,
> > >>
> > >> We have a simple stateful-function job, consuming from Kafka, calling
> > >> an HTTP endpoint (on AWS via an Elastic Load Balancer) and publishing
> > >> the result back via Kafka again.
> > >>
> > >> * We created a jar file to be deployed on a standalone cluster (it's
> > >> not a docker Image), therefore we add `statefun-flink-distribution`
> > >> version 3.0.0 as a dependency in that jar file.
> > >> * Entry class in our job configuration is
> > >> `org.apache.flink.statefun.flink.core.StatefulFunctionsJob` and we
> > >> simply keep a single module.yaml file in resources folder for the
> > >> module configuration.
> > >>
> > >> My question here is, we would like to deploy that jar to different
> > >> environments (dev. and prod.) and not sure how we can pass different
> > >> module configurations (module.yaml or module_nxt.yaml/module_prd.yaml)
> > >> to the job during startup without creating separate jar files for
> > >> different environments?
> > >>
> > >> Thanks,
> > >> Deniz


Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-09 Thread Ayush Chauhan
My usecase is that as soon as the avro message version is changed, I want
to reload the job graph so that I can update the downstream iceberg table.

Iceberg FlinkSink take table schema during the job start and cannot be
updated during runtime. So, I want to trigger graceful shutdown and restart
the job.

Can I reload the job graph to achieve that?



On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise  wrote:

> Hi Ayush,
>
> DeserializationSchema.isEndOfStream was only ever supported by Kafka. For
> new Kafka source, the recommended way is to use the bounded mode like this
>
> KafkaSource source =
> KafkaSource.builder()
> ...
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setBounded(OffsetsInitializer.latest())
> .build();
>
> You can implement your own OffsetsInitializer or use a provided one.
>
> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan  wrote:
>
>> There is no way to end the kafka stream from the deserializer.
>>
>> When would you want to end the stream? Could you explain why you need to
>> end the kafka stream without using the offset?
>>
>> Ayush Chauhan  于2021年12月8日周三 15:29写道:
>>
>>>
>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>
>>>
>>>
>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger 
>>> wrote:
>>>
 Hi Ayush,

 I couldn't find the documentation you've mentioned. Can you send me a
 link to it?

 On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan 
 wrote:

> Hi,
>
> Can you please let me know the alternatives of isEndOfStream() as now
> according to docs this method will no longer be used to determine the end
> of the stream.
>
> --
>  Ayush Chauhan
>  Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>

>>>
>>> --
>>>  Ayush Chauhan
>>>  Data Platform
>>>  [image: mobile-icon]  +91 9990747111
>>>
>>>
>>> This email is intended only for the person or the entity to whom it is
>>> addressed. If you are not the intended recipient, please delete this email
>>> and contact the sender.
>>>
>>

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



flink1.10.0-RestClusterClient-cancel job报错

2021-12-09 Thread nicygan

dear all:
 如题,我在调用RestClusterClient#cancel(JobID jobId)方法取消作业时,get不到结果,但作业能正常停止。

用future.get()会报错如下:
Number of retries has been exhausted.

用future.get(10, TimeUnit.SECONDS)会报错timeout.

调用#cancelWithSavepoint(...)和#stopWithSavepoint(...)就没问题,可以正常获取到结果,不报错。

目前发现,
1.10.0有问题,
1.14.0没有上述问题。


作业运行于cdh yarn集群,版本2.6.0
作业部署,per-job

代码如下:
try (ClusterClient clusterClient = new RestClusterClient<>(configuration, 
clusterId)) {
clusterClient
 .cancel(jobId)
 .get(20, TimeUnit.SECONDS)
} catch (Exception e) {
//
}

有谁知道如何解决此问题吗?
非常感谢!

flink1.10.0-RestClusterClient-cancel job报错

2021-12-09 Thread nicygan

dear all:
  如题,我在调用RestClusterClient#cancel(JobID jobId)方法取消作业时,get不到结果,但作业能正常停止。

用future.get()会报错如下:
Number of retries has been exhausted.

用future.get(10, TimeUnit.SECONDS)会报错timeout.

调用#cancelWithSavepoint(...)和#stopWithSavepoint(...)就没问题,可以正常获取到结果,不报错。

目前发现,
1.10.0有问题,
1.14.0没有上述问题。


作业运行于cdh yarn集群,版本2.6.0
作业部署,per-job

代码如下:
try (ClusterClient clusterClient = new RestClusterClient<>(configuration, 
clusterId)) {
 clusterClient
  .cancel(jobId)
  .get(20, TimeUnit.SECONDS)
} catch (Exception e) {
 //
}

有谁知道如何解决此问题吗?
非常感谢!


FileSource with Parquet Format - parallelism level

2021-12-09 Thread Krzysztof Chmielewski
Hi,
can I have a File DataStream Source that will work with Parquet Format and
have parallelism level higher than one?

Is it possible to read  Parquet  file in chunks by multiple threads?

Regards,
Krzysztof Chmielewski


Re: Re: flink sql支持细粒度的状态配置

2021-12-09 Thread Yun Tang
Hi,

如果你们可以自己实现一套SQL语句到jobgraph的预编译转换IDE,然后在IDE中可以手动配置jobgraph每个算子的配置,应该是可以达到你们的目的 
(可能还需要结合细粒度调度模式)。

祝好
唐云

From: gygz...@163.com 
Sent: Thursday, December 9, 2021 16:14
To: user-zh 
Subject: 回复: Re: flink sql支持细粒度的状态配置

Hi Yun Tang

感谢你的回复,我们在调研的过程中也发现,正如你所说的生成的plan可能差异很大

但是每个operator的TTL生效时间是在execNode转换成对应的Transformation时,通过传入的StreamPlanner带进去的,TableConfig属性中包含了全局的TTL时间

在每个ExecNode转换的过程translateToPlanInternal((PlannerBase) 
planner)中使用这个TTL时间生成对应的operator

所以我们在考虑是否可以在,每个Transformation生成阶段,先去修改一下TableConfig中TTL的配置再调用每个execNode转换成operator的方法,来做到Transformation级别的TTL控制,这个配置开放给平台的用户,通过Transformation的id做识别,是否能给一些建议




gygz...@163.com

发件人: Yun Tang
发送时间: 2021-12-09 10:57
收件人: user-zh
主题: Re: flink sql支持细粒度的状态配置
Hi 你好,

我认为这是一个很好的需求,对于data stream以及python API来说,state 
TTL都是通过API逐个配置的,你的需求就可以直接满足。但是对于SQL来说,由于相同的SQL语句,不同优化器其生成的执行plan可能会差异很大,很难对某个operator内的state进行TTL进行配置,可能一种方式是增加一些SQL的优化hint,对于你示例中的join语句和groupBy
 的count语句配以不同的TTL,但是目前Flink SQL尚未支持该功能。


祝好
唐云


From: gygz...@163.com 
Sent: Tuesday, December 7, 2021 18:38
To: user-zh 
Subject: flink sql支持细粒度的状态配置

Hi all

在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效

如果我存在一个如下sql

select count(1),region from (select * from A join B on a.uid = b.uid)  group by 
region

如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零

如果不配置,又会导致Regular join的状态增大

这是其中一个场景,这里只是举一个例子

主要是想询问针对 Sql中需要配置局部State的ttl时间,或者同一个任务每个sql配置不同的TTL时间,这种场景应该如何去做 ?



gygz...@163.com


回复: Re: flink sql支持细粒度的状态配置

2021-12-09 Thread gygz...@163.com
Hi Yun Tang

感谢你的回复,我们在调研的过程中也发现,正如你所说的生成的plan可能差异很大

但是每个operator的TTL生效时间是在execNode转换成对应的Transformation时,通过传入的StreamPlanner带进去的,TableConfig属性中包含了全局的TTL时间

在每个ExecNode转换的过程translateToPlanInternal((PlannerBase) 
planner)中使用这个TTL时间生成对应的operator

所以我们在考虑是否可以在,每个Transformation生成阶段,先去修改一下TableConfig中TTL的配置再调用每个execNode转换成operator的方法,来做到Transformation级别的TTL控制,这个配置开放给平台的用户,通过Transformation的id做识别,是否能给一些建议




gygz...@163.com
 
发件人: Yun Tang
发送时间: 2021-12-09 10:57
收件人: user-zh
主题: Re: flink sql支持细粒度的状态配置
Hi 你好,
 
我认为这是一个很好的需求,对于data stream以及python API来说,state 
TTL都是通过API逐个配置的,你的需求就可以直接满足。但是对于SQL来说,由于相同的SQL语句,不同优化器其生成的执行plan可能会差异很大,很难对某个operator内的state进行TTL进行配置,可能一种方式是增加一些SQL的优化hint,对于你示例中的join语句和groupBy
 的count语句配以不同的TTL,但是目前Flink SQL尚未支持该功能。
 
 
祝好
唐云
 

From: gygz...@163.com 
Sent: Tuesday, December 7, 2021 18:38
To: user-zh 
Subject: flink sql支持细粒度的状态配置
 
Hi all
 
在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效
 
如果我存在一个如下sql
 
select count(1),region from (select * from A join B on a.uid = b.uid)  group by 
region
 
如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零
 
如果不配置,又会导致Regular join的状态增大
 
这是其中一个场景,这里只是举一个例子
 
主要是想询问针对 Sql中需要配置局部State的ttl时间,或者同一个任务每个sql配置不同的TTL时间,这种场景应该如何去做 ?
 
 
 
gygz...@163.com