Re: create savepoint on bounded source in streaming mode

2022-01-25 Thread Dawid Wysakowicz

Hi Shawn,

You could also take a look at the hybrid source[1]

Best,

Dawid

[1]https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/

On 26/01/2022 08:39, Guowei Ma wrote:

Hi Shawn
Currently Flink can not trigger the sp at the end of the input. An 
alternative way might be that you need to develop a customized source, 
which triggers a savepoint when it notices that all the input split 
has been handled.

Or you could see the state process api[1], which might be helpful.

Thanks for your sharing but I have another little question:
I think you need to process all the historical events to rebuild the 
correct state. So there might be no gain even if you periodically 
create a savepoint. So why did you need to "rebuild" the state 
periodically? Do I miss something?


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/


Best,
Guowei


On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:

     our application is stateful. processing live events depends
on the state. but for kinds of reason, we need rebuild the state.
it will be very costly to replay all data.
     our historical events data are stored in s3. so we want to
create states/savepoints periodically so that we can rebuild the
state from a point. we call this as a bootstrap process.
     any ideas?

     Thanks.

--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 14:04
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi, Shawn
I think Flink does not support this mechanism yet.
Would you like to share the scenario in which you need this
savepoint at the end of the bounded input?
Best,
Guowei


On Wed, Jan 26, 2022 at 1:50 PM Shawn Du
 wrote:
Hi experts,

assume I have several files and I want replay these files in
order in streaming mode and create a savepoint when files play
at the end. it is possible?
I wrote a simple test app, and job are finished when source is
at the end. I have no chance to creat a savepoint. please help.

Thanks
Shawn




OpenPGP_signature
Description: OpenPGP digital signature


Re: create savepoint on bounded source in streaming mode

2022-01-25 Thread Guowei Ma
Hi Shawn
Currently Flink can not trigger the sp at the end of the input. An
alternative way might be that you need to develop a customized source,
which triggers a savepoint when it notices that all the input split has
been handled.
Or you could see the state process api[1], which might be helpful.

Thanks for your sharing but I have another little question:
I think you need to process all the historical events to rebuild the
correct state. So there might be no gain even if you periodically create a
savepoint. So why did you need to "rebuild" the state periodically? Do I
miss something?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

Best,
Guowei


On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:

>
>our application is stateful. processing live events depends on the
> state. but for kinds of reason, we need rebuild the state. it will be very
> costly to replay all data.
>our historical events data are stored in s3. so we want to create
> states/savepoints periodically so that we can rebuild the state from a
> point. we call this as a bootstrap process.
>any ideas?
>
>Thanks.
>
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 14:04
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi, Shawn
> I think Flink does not support this mechanism yet.
> Would you like to share the scenario in which you need this savepoint at
> the end of the bounded input?
> Best,
> Guowei
>
>
> On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
> Hi experts,
>
> assume I have several files and I want replay these files in order in
> streaming mode and create a savepoint when files play at the end. it is
> possible?
> I wrote a simple test app, and job are finished when source is at the end.
> I have no chance to creat a savepoint. please help.
>
> Thanks
> Shawn
>
>
>


Re: create savepoint on bounded source in streaming mode

2022-01-25 Thread Shawn Du

   our application is stateful. processing live events depends on the 
state. but for kinds of reason, we need rebuild the state. it will be very 
costly to replay all data.
   our historical events data are stored in s3. so we want to create 
states/savepoints periodically so that we can rebuild the state from a point. 
we call this as a bootstrap process.
   any ideas?

   Thanks.
--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 14:04
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi, Shawn
I think Flink does not support this mechanism yet. 
Would you like to share the scenario in which you need this savepoint at the 
end of the bounded input?
Best,
Guowei

On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
Hi experts,

assume I have several files and I want replay these files in order in streaming 
mode and create a savepoint when files play at the end. it is possible?
I wrote a simple test app, and job are finished when source is at the end. I 
have no chance to creat a savepoint. please help.

Thanks
Shawn



Re: create savepoint on bounded source in streaming mode

2022-01-25 Thread Guowei Ma
Hi, Shawn
I think Flink does not support this mechanism yet.
Would you like to share the scenario in which you need this savepoint at
the end of the bounded input?
Best,
Guowei


On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:

> Hi experts,
>
> assume I have several files and I want replay these files in order in
> streaming mode and create a savepoint when files play at the end. it is
> possible?
> I wrote a simple test app, and job are finished when source is at the end.
> I have no chance to creat a savepoint. please help.
>
> Thanks
> Shawn
>


create savepoint on bounded source in streaming mode

2022-01-25 Thread Shawn Du
Hi experts,

assume I have several files and I want replay these files in order in streaming 
mode and create a savepoint when files play at the end. it is possible?
I wrote a simple test app, and job are finished when source is at the end. I 
have no chance to creat a savepoint. please help.

Thanks
Shawn

Re: 如何给flink的输出削峰填谷?

2022-01-25 Thread yidan zhao
如果不需要统一窗口周期,比如0-10,10-20的话,可以考虑根据key决定窗口offset,这样将窗口offset随机化,输出就变平滑了。

Paul Lam  于2022年1月26日周三 10:18写道:

> Hi,
>
> 如果是 DataStream 应用的话,最简单的方式是给 sink 之前加个 throttle 算子,比如 guava RateLimiter。
>
> SQL 应用的话可能要实现个 UDF 来做。
>
> Best,
> Paul Lam
>
> > 2022年1月26日 02:11,Jing  写道:
> >
> > Hi Flink中文社区,
> >
> > 我碰到一个这样的问题,我的数据库有write throttle, 我的flink
> > app是一个10分钟窗口的聚合操作,这样导致,每10分钟有个非常大量的写请求。导致数据库的sink有时候会destroy.
> > 有什么办法把这些写请求均匀分布到10分钟吗?
> >
> >
> > 谢谢,
> > Jing
>
>


Re: 如何给flink的输出削峰填谷?

2022-01-25 Thread Paul Lam
Hi,

如果是 DataStream 应用的话,最简单的方式是给 sink 之前加个 throttle 算子,比如 guava RateLimiter。

SQL 应用的话可能要实现个 UDF 来做。

Best,
Paul Lam

> 2022年1月26日 02:11,Jing  写道:
> 
> Hi Flink中文社区,
> 
> 我碰到一个这样的问题,我的数据库有write throttle, 我的flink
> app是一个10分钟窗口的聚合操作,这样导致,每10分钟有个非常大量的写请求。导致数据库的sink有时候会destroy.
> 有什么办法把这些写请求均匀分布到10分钟吗?
> 
> 
> 谢谢,
> Jing



Re: Flink Connector类冲突问题

2022-01-25 Thread Ada Wong
https://issues.apache.org/jira/browse/FLINK-15786

Ada Wong  于2022年1月25日周二 19:40写道:
>
> 目前多个Connector出现无法shade的类冲突是没办法解决的。
> 我提了个issue,里面写了大致的解决方案。有老哥再这块熟悉一点的嘛?可以再issue里讨论下。
> https://issues.apache.org/jira/browse/FLINK-25804


Re: How to run in IDE?

2022-01-25 Thread John Smith
I'm using: final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

But no go.

On Mon, 24 Jan 2022 at 16:35, John Smith  wrote:

> Hi using Flink 1.14.3 with gradle. I explicitly added the flink client
> dependency and the job starts but it quits with...
>
> In Flink 1.10 the job worked as is. How do I set the number of slots and
> is there any other settings for the IDE?
>
> 16:29:50,633 INFO
>  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
>  - Received resource requirements from job
> 3a3e9c46da413071392bce161c39270f:
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
> numberOfRequiredSlots=2}]
> 16:29:50,633 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- Sink: Print to Std. Out (14/16) (d7c4fbf5f23f3118e54998f2b35338c1)
> switched from CANCELING to CANCELED.
>


Flink POJO documentation for primitive boolean state variables

2022-01-25 Thread Makhanchan Pandey
Hi all,

For Flink to treat a model class as a special POJO type, these are the
documented conditions:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#pojos

It says the following:

   -

   All fields are either public or must be accessible through getter and
   setter functions. For a field called foo the getter and setter methods
   must be named getFoo() and setFoo().


We have boolean fields in our model class:
public class Input {
  private boolean open;
}

As per this we are forced to create an accessor getOpen( ) while the
standard Javabeans convention would be isOpen().

We ran some tests with both isOpen and getOpen and both seem to be
recognized as POJO.

Could we get confirmation that using isOpen() is valid here? And that this
primitive boolean case is just not been documented in the above link?


Regards,
Mac Pandey


Re: Regarding Queryable state in Flink

2022-01-25 Thread Martijn Visser
Hi Jessy,

Queryable State is considered approaching end of life [1] per the Flink
Roadmap.

There are currently no development activities planned for it.

Best regards,

Martijn

[1]
https://flink.apache.org/roadmap.html

Op di 25 jan. 2022 om 18:00 schreef Jessy Ping 

> Hi Matthias,
>
> I want to query the current state of the application at real-time. Hence,
> state processor API won't fit here. I have the following questions,
>
> * Is the queryable state stable enough to use in production systems ?.
>
> Are there any improvements or development activities planned or going on
> with queryable state ? Is  Flink community still supporting queryable state
> and suggesting its usage? .
>
>
>
> Thanks Jessy
>
>
> On Tue, Jan 25, 2022, 10:19 PM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Hi Jessy,
>>
>>
>>
>> Have you considered using the state processor api [1] for offline
>> analysis of checkpoints and savepoints?
>>
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
>>
>>
>>
>> Sincere greetings
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>> *From:* Jessy Ping 
>> *Sent:* Montag, 24. Januar 2022 16:47
>> *To:* user 
>> *Subject:* Regarding Queryable state in Flink
>>
>>
>>
>> Hi Team,
>>
>>
>>
>> We are currently running our streaming application based Flink(Datastream
>> API ) on a non-prod cluster.And planning to move it to production cluster
>> soon.. We are keeping cerating keyed state backed by rocksdb in the flink
>> application. We need a mechanism to query these keyed state values for
>> debugging and troubleshooting. Is it a good idea to use Queryable state for
>> a single link-job running in application-mode on kubernetes for an average
>> load of 10k events/second ?.
>>
>> Or is it a better idea to keep these state values in an external k,v
>> store ?.
>>
>>
>>
>> So in short --> Is the queryable state stable enough to use in production
>> systems ?
>>
>>
>>
>>
>>
>> Thanks
>>
>> Jessy
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
> --

Martijn Visser | Product Manager

mart...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


RE: Flink logging

2022-01-25 Thread Oran Shuster
It's hard for me to see the issue from what you posted, However i can post
how i added that jar to my flink pods and you can compare
Instead of creating a custom image i loaded the JAR as a config map

You can create a configMap easily from a file:
1.Download the jar you want
2.Create the configMap in k8s using this command kubectl create configmap
log4j-layout-template-json-2-16-0-jar
--from-file=log4j-layout-template-json-2.16.0.jar
3.Then in flink just mount the configMap and copy it to the lib folder with
the custom entrypoint

As for the logging config

rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender

logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
- %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = JsonTemplateLayout #This and all other
layout setting are the important part
appender.rolling.layout.compact = true
appender.rolling.layout.eventEol = true
appender.rolling.layout.stacktraceAsString = true
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF


On 2022/01/20 14:24:32 Jessy Ping wrote:
> Hi Team,
>
> How can I use log4j-layout-template-json in flink for logging in json
> format ?.
> I have tried to add this jar into the /flink/lib/ directory with a custom
> image and I am getting the following error.
> Any insight would be appreciated.
>
> [image: image.png]
>
> Thanks
> Jessy
>


Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

2022-01-25 Thread M Singh
 
Thanks Edward for your explanation.  I missed the part about the aggregationKey 
being added the processor.  On Tuesday, January 25, 2022, 02:12:41 PM EST, 
Colletta, Edward  wrote:  
 
 
Here is some sample data which may help visualize how the aggregation is 
changed dynamically. 
 
We start by aggregating by session and session+account by placing values into 
aggregationKey based on the fields in groupByFIelds.  
 
Then we delete the session+account aggregation, and add an aggregation by 
account.
 
The way we are changing the aggregation dynamically is by using an indirect 
field to key by called aggregationKey which we add based on current broadcast 
state.
 
Note, this is for streaming jobs and aggregations starts fresh from the point 
at which a new groupByType is received.
 
  
 
aggregateInstruction
 
{groupByFields:[session],groupByType:bySession,action:add}
 
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:add}
 
  
 
dataToAggregate
 
{session:1,account:1,value:100}
 
{session:2,account:1,value:200}
 
{session:1,account:2,value:400}
 
{session:1,account:1,value:800}
 
  
 
  
 
streamReadyToAggregate
 
{session:1,account:1,value:100,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
 
{session:1,account:1,value:100,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}
 
{session:2,account:1,value:200,groupByFields:[session],groupByType:bySession,aggregationKey:'2'}}
 
{session:2,account:1,value:200,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'2|1'}}
 
{session:1,account:2,value:400,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
 
{session:1,account:2,value:400,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|2'}}
 
{session:1,account:1,value:800,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}}
 
{session:1,account:1,value:800,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}}}
 
  
 
aggregateInstruction
 
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:delete}
 
{groupByFields:[account],groupByType:byAccount,action:add}
 
  
 
dataToAggregate
 
{session:3,account:1,value:1600}
 
{session:3,account:2,value:3200}
 
  
 
streamReadyToAggregate
 
{session:3,account:1,value:1600,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
 
{session:3,account:1,value:1600,groupByFields:[account],groupByType:byAccount,aggregationKey:'1'}
 
{session:3,account:1,value:3200,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
 
{session:3,account:1,value:3200,groupByFields:[account],groupByType:byAccount,aggregationKey:'2'}
 
  
 
  
 
  
 
From: Colletta, Edward  
Sent: Tuesday, January 25, 2022 1:29 PM
To: M Singh ; Caizhi Weng ; 
User-Flink 
Subject: RE: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application
 
  
 
You don’t have to add keyBy’s at runtime.  You change what is in the value 
ofaggregationKey at run time
 
Some records may appear several times with different fields extracted to 
aggregationKey.  They dynamic building of the grouping is really done by the 
flatMap
 
  
 
  
 
From: M Singh 
Sent: Tuesday, January 25, 2022 1:12 PM
To: Caizhi Weng ; User-Flink ; 
Colletta, Edward 
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application
 
  
 
NOTICE: This email is from an external sender -do not click on links or 
attachments unless you recognize the sender and know the content is safe.
 
  
 
Thanks Edward for your response.
 
  
 
The problem I have is that I am not sure how to add or remove keyBy's at run 
time since the flink topology is based on that (as Caizhi mentioned).
 
  
 
I believe we can change the single keyBy in your example, but not add/remove 
them.  
 
  
 
Please let me know if I have missed anything.
 
  
 
  
 
  
 
On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward 
 wrote:
 
  
 
  
 
 
 
A general pattern for dynamically adding new aggregations could be something 
like this
 
 
 
    BroadcastStream broadcastStream = 
aggregationInstructions
 
    .broadcast(broadcastStateDescriptor);
 
 
 
    DataStream 
streamReadyToAggregate = dataToAggregate
 
    .connect(broadcastStream)
 
    .process(new JoinFunction())
 
    .flatMap(new AddAggregationKeyAndDescriptor)
 
    .keyBy('aggregationKey')
 
 
 
Where
 
·   aggregationInstructions is a stream describing which fields to 
aggregate by.  It might contain a List of the field names and another 
field which can be used to describe what the aggregation is doing.   Example  
groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = 
‘bySession’, action = ‘Add’ or  ‘Delete’
 
·   JoinFunction is a KeyedBroadcastProcessFunction which adds the 
groupByFields and groupingType to each message in the 

Failure Restart Strategy leads to error

2022-01-25 Thread Siddhesh Kalgaonkar
I have Flink Kafka Consumer in place which works fine until I add the below
lines:

env.setRestartStrategy(RestartStrategies.failureRateRestart(
3,
*// max failures per unit* Time.of(5, TimeUnit.MINUTES),
*//time interval for measuring failure rate* Time.of(10, TimeUnit.SECONDS)
*// delay*))

It gives me the below error stack trace:

DEBUG [flink-akka.actor.default-dispatcher-14] (JobMaster.java:1119) -
Close ResourceManager connection 05d80aa9f3aca06faf7be80bbc8a0642.
org.apache.flink.util.FlinkException: Stopping JobMaster for job Flink
Kafka Example(b425ae91bfb0e81980b878b3e4392137).
at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:400)
at
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
DEBUG [flink-akka.actor.default-dispatcher-12]
(DefaultJobLeaderIdService.java:148) - Remove job
b425ae91bfb0e81980b878b3e4392137 from job leader id monitoring.
 INFO [flink-akka.actor.default-dispatcher-12] (ResourceManager.java:1047)
- Disconnect job manager
a95c280817468866d08c3230ecd0462f@akka://flink/user/rpc/jobmanager_3
for job b425ae91bfb0e81980b878b3e4392137 from the resource manager.
DEBUG [flink-akka.actor.default-dispatcher-12]
(DefaultResourceTracker.java:80) - Initiating tracking of resources for job
b425ae91bfb0e81980b878b3e4392137.
DEBUG [flink-akka.actor.default-dispatcher-12]
(DefaultResourceTracker.java:60) - Stopping tracking of resources for job
b425ae91bfb0e81980b878b3e4392137.
DEBUG [flink-akka.actor.default-dispatcher-14] (AkkaRpcActor.java:131) -
The RpcEndpoint jobmanager_3 terminated successfully.
 INFO [flink-akka.actor.default-dispatcher-8]
(DefaultJobLeaderService.java:136) - Stop job leader service.
 INFO [flink-akka.actor.default-dispatcher-8]
(TaskExecutorLocalStateStoresManager.java:231) - Shutting down
TaskExecutorLocalStateStoresManager.
DEBUG [flink-akka.actor.default-dispatcher-8] (IOManagerAsync.java:121) -
Shutting down I/O manager.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at 

RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

2022-01-25 Thread Colletta, Edward
Here is some sample data which may help visualize how the aggregation is 
changed dynamically.
We start by aggregating by session and session+account by placing values into 
aggregationKey based on the fields in groupByFIelds.
Then we delete the session+account aggregation, and add an aggregation by 
account.
The way we are changing the aggregation dynamically is by using an indirect 
field to key by called aggregationKey which we add based on current broadcast 
state.
Note, this is for streaming jobs and aggregations starts fresh from the point 
at which a new groupByType is received.

aggregateInstruction
{groupByFields:[session],groupByType:bySession,action:add}
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:add}

dataToAggregate
{session:1,account:1,value:100}
{session:2,account:1,value:200}
{session:1,account:2,value:400}
{session:1,account:1,value:800}


streamReadyToAggregate
{session:1,account:1,value:100,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
{session:1,account:1,value:100,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}
{session:2,account:1,value:200,groupByFields:[session],groupByType:bySession,aggregationKey:'2'}}
{session:2,account:1,value:200,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'2|1'}}
{session:1,account:2,value:400,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
{session:1,account:2,value:400,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|2'}}
{session:1,account:1,value:800,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}}
{session:1,account:1,value:800,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}}}

aggregateInstruction
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:delete}
{groupByFields:[account],groupByType:byAccount,action:add}

dataToAggregate
{session:3,account:1,value:1600}
{session:3,account:2,value:3200}

streamReadyToAggregate
{session:3,account:1,value:1600,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
{session:3,account:1,value:1600,groupByFields:[account],groupByType:byAccount,aggregationKey:'1'}
{session:3,account:1,value:3200,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
{session:3,account:1,value:3200,groupByFields:[account],groupByType:byAccount,aggregationKey:'2'}



From: Colletta, Edward 
Sent: Tuesday, January 25, 2022 1:29 PM
To: M Singh ; Caizhi Weng ; 
User-Flink 
Subject: RE: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application

You don’t have to add keyBy’s at runtime.  You change what is in the value of 
aggregationKey at run time
Some records may appear several times with different fields extracted to 
aggregationKey.  They dynamic building of the grouping is really done by the 
flatMap


From: M Singh mailto:mans2si...@yahoo.com>>
Sent: Tuesday, January 25, 2022 1:12 PM
To: Caizhi Weng mailto:tsreape...@gmail.com>>; User-Flink 
mailto:user@flink.apache.org>>; Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

Thanks Edward for your response.

The problem I have is that I am not sure how to add or remove keyBy's at run 
time since the flink topology is based on that (as Caizhi mentioned).

I believe we can change the single keyBy in your example, but not add/remove 
them.

Please let me know if I have missed anything.



On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward 
mailto:edward.colle...@fmr.com>> wrote:





A general pattern for dynamically adding new aggregations could be something 
like this



BroadcastStream broadcastStream = 
aggregationInstructions

.broadcast(broadcastStateDescriptor);



DataStream 
streamReadyToAggregate = dataToAggregate

.connect(broadcastStream)

.process(new JoinFunction())

.flatMap(new AddAggregationKeyAndDescriptor)

.keyBy('aggregationKey')



Where

·aggregationInstructions is a stream describing which fields to 
aggregate by.  It might contain a List of the field names and another 
field which can be used to describe what the aggregation is doing.   Example  
groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = 
‘bySession’, action = ‘Add’ or  ‘Delete’

·JoinFunction is a KeyedBroadcastProcessFunction which adds the 
groupByFields and groupingType to each message in the dataToAggregate stream 
and possibly deletes groupings from state.

·AddAggregationKeyAndDescriptor is a FlatMapFunction which adds 
aggregationKey to the stream based on the value of groupByFields



The flatMap means one message may 

RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

2022-01-25 Thread Colletta, Edward
You don’t have to add keyBy’s at runtime.  You change what is in the value of 
aggregationKey at run time
Some records may appear several times with different fields extracted to 
aggregationKey.  They dynamic building of the grouping is really done by the 
flatMap


From: M Singh 
Sent: Tuesday, January 25, 2022 1:12 PM
To: Caizhi Weng ; User-Flink ; 
Colletta, Edward 
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

Thanks Edward for your response.

The problem I have is that I am not sure how to add or remove keyBy's at run 
time since the flink topology is based on that (as Caizhi mentioned).

I believe we can change the single keyBy in your example, but not add/remove 
them.

Please let me know if I have missed anything.



On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward 
mailto:edward.colle...@fmr.com>> wrote:





A general pattern for dynamically adding new aggregations could be something 
like this



BroadcastStream broadcastStream = 
aggregationInstructions

.broadcast(broadcastStateDescriptor);



DataStream 
streamReadyToAggregate = dataToAggregate

.connect(broadcastStream)

.process(new JoinFunction())

.flatMap(new AddAggregationKeyAndDescriptor)

.keyBy('aggregationKey')



Where

·aggregationInstructions is a stream describing which fields to 
aggregate by.  It might contain a List of the field names and another 
field which can be used to describe what the aggregation is doing.   Example  
groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = 
‘bySession’, action = ‘Add’ or  ‘Delete’

·JoinFunction is a KeyedBroadcastProcessFunction which adds the 
groupByFields and groupingType to each message in the dataToAggregate stream 
and possibly deletes groupings from state.

·AddAggregationKeyAndDescriptor is a FlatMapFunction which adds 
aggregationKey to the stream based on the value of groupByFields



The flatMap means one message may be emitted several times with different 
values of aggregationKey so it may belong to multiple aggregations.







From: M Singh mailto:mans2si...@yahoo.com>>
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng mailto:tsreape...@gmail.com>>; User-Flink 
mailto:user@flink.apache.org>>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application



NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.



Hi Caizhi:



Thanks for your reply.



I need to aggregate streams based on dynamic groupings.  All the groupings 
(keyBy) are not known upfront and can be added or removed after the streaming 
application is started and I don't want to restart the application/change the 
code.  So, I wanted to find out, what are the options to achieve this 
functionality.  Please let me know if you have any advice or recommendations.



Thanks



On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng 
mailto:tsreape...@gmail.com>> wrote:





Hi!



Adding/removing keyed streams will change the topology graph of the job. 
Currently it is not possible to do so without restarting the job and as far as 
I know there is no existing framework/pattern to achieve this.



By the way, why do you need this functionality? Could you elaborate more on 
your use case?



M Singh mailto:mans2si...@yahoo.com>> 于2022年1月22日周六 
21:32写道:

Hi Folks:



I am working on an exploratory project in which I would like to add/remove 
KeyedStreams 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby)
 without restarting the Flink streaming application.



Is it possible natively in Apache Flink ?  If not, is there any 
framework/pattern which can be used to implement this without restarting the 
application/changing the code ?



Thanks


















如何给flink的输出削峰填谷?

2022-01-25 Thread Jing
Hi Flink中文社区,

我碰到一个这样的问题,我的数据库有write throttle, 我的flink
app是一个10分钟窗口的聚合操作,这样导致,每10分钟有个非常大量的写请求。导致数据库的sink有时候会destroy.
有什么办法把这些写请求均匀分布到10分钟吗?


谢谢,
Jing


Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

2022-01-25 Thread M Singh
 Thanks Edward for your response.
The problem I have is that I am not sure how to add or remove keyBy's at run 
time since the flink topology is based on that (as Caizhi mentioned).
I believe we can change the single keyBy in your example, but not add/remove 
them.  
Please let me know if I have missed anything.


On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward 
 wrote:  
 
 
  
 
A general pattern for dynamically adding new aggregations could be something 
like this
 
  
 
    BroadcastStream broadcastStream = 
aggregationInstructions
 
    .broadcast(broadcastStateDescriptor);
 
  
 
    DataStream 
streamReadyToAggregate = dataToAggregate
 
    .connect(broadcastStream)
 
    .process(new JoinFunction())
 
    .flatMap(new AddAggregationKeyAndDescriptor)
 
    .keyBy('aggregationKey')
 
  
 
Where

   - aggregationInstructions is a stream describing which fields to aggregate 
by.  It might contain a List of the field names and another field which 
can be used to describe what the aggregation is doing.   Example  
groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = 
‘bySession’, action = ‘Add’ or  ‘Delete’
   - JoinFunction is a KeyedBroadcastProcessFunction which adds the 
groupByFields and groupingType to each message in the dataToAggregate stream 
and possibly deletes groupings from state.
   - AddAggregationKeyAndDescriptor is a FlatMapFunction which adds 
aggregationKey to the stream based on the value of groupByFields
 
  
 
The flatMap means one message may be emitted several times with different 
values of aggregationKey so it may belong to multiple aggregations.
 
  
 
  
 
  
 
From: M Singh  
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng ; User-Flink 
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application
 
  
 
NOTICE: This email is from an external sender -do not click on links or 
attachments unless you recognize the sender and know the content is safe.
 
  
 
Hi Caizhi:
 
  
 
Thanks for your reply.
 
  
 
I need to aggregate streams based on dynamic groupings.  All the groupings 
(keyBy) are not known upfront and can be added or removed after the streaming 
application is started and I don't want to restart the application/change the 
code.  So, I wanted to find out, what are the options to achieve this 
functionality.  Please let me know if you have any advice or recommendations.
 
  
 
Thanks
 
  
 
On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng 
 wrote:
 
  
 
  
 
Hi!
 
  
 
Adding/removing keyed streams will change the topology graph of the job. 
Currently it is not possible to do so without restarting the job and as far as 
I know there is no existing framework/pattern to achieve this.
 
  
 
By the way, why do you need this functionality? Could you elaborate more on 
your use case?
 
  
 
M Singh 于2022年1月22日周六 21:32写道:
 

Hi Folks:
 
  
 
I am working on an exploratory project in which I would like to add/remove 
KeyedStreams 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby)
 without restarting the Flink streaming application.
 
  
 
Is it possible natively in Apache Flink ?  If not, is there any 
framework/pattern which can be used to implement this without restarting the 
application/changing the code ?
 
  
 
Thanks
 
  
 
  
 
  
 
  
 
  
 
  
 
  
 
  
 
  

RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

2022-01-25 Thread Colletta, Edward

A general pattern for dynamically adding new aggregations could be something 
like this

BroadcastStream broadcastStream = 
aggregationInstructions
.broadcast(broadcastStateDescriptor);

DataStream 
streamReadyToAggregate = dataToAggregate
.connect(broadcastStream)
.process(new JoinFunction())
.flatMap(new AddAggregationKeyAndDescriptor)
.keyBy('aggregationKey')

Where

  *   aggregationInstructions is a stream describing which fields to aggregate 
by.  It might contain a List of the field names and another field which 
can be used to describe what the aggregation is doing.   Example  
groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = 
‘bySession’, action = ‘Add’ or  ‘Delete’
  *   JoinFunction is a KeyedBroadcastProcessFunction which adds the 
groupByFields and groupingType to each message in the dataToAggregate stream 
and possibly deletes groupings from state.
  *   AddAggregationKeyAndDescriptor is a FlatMapFunction which adds 
aggregationKey to the stream based on the value of groupByFields

The flatMap means one message may be emitted several times with different 
values of aggregationKey so it may belong to multiple aggregations.



From: M Singh 
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng ; User-Flink 
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

Hi Caizhi:

Thanks for your reply.

I need to aggregate streams based on dynamic groupings.  All the groupings 
(keyBy) are not known upfront and can be added or removed after the streaming 
application is started and I don't want to restart the application/change the 
code.  So, I wanted to find out, what are the options to achieve this 
functionality.  Please let me know if you have any advice or recommendations.

Thanks

On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng 
mailto:tsreape...@gmail.com>> wrote:


Hi!

Adding/removing keyed streams will change the topology graph of the job. 
Currently it is not possible to do so without restarting the job and as far as 
I know there is no existing framework/pattern to achieve this.

By the way, why do you need this functionality? Could you elaborate more on 
your use case?

M Singh mailto:mans2si...@yahoo.com>> 于2022年1月22日周六 
21:32写道:
Hi Folks:

I am working on an exploratory project in which I would like to add/remove 
KeyedStreams 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby)
 without restarting the Flink streaming application.

Is it possible natively in Apache Flink ?  If not, is there any 
framework/pattern which can be used to implement this without restarting the 
application/changing the code ?

Thanks










Re: Regarding Queryable state in Flink

2022-01-25 Thread Jessy Ping
Hi Matthias,

I want to query the current state of the application at real-time. Hence,
state processor API won't fit here. I have the following questions,

* Is the queryable state stable enough to use in production systems ?.

Are there any improvements or development activities planned or going on
with queryable state ? Is  Flink community still supporting queryable state
and suggesting its usage? .



Thanks Jessy


On Tue, Jan 25, 2022, 10:19 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Jessy,
>
>
>
> Have you considered using the state processor api [1] for offline analysis
> of checkpoints and savepoints?
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* Jessy Ping 
> *Sent:* Montag, 24. Januar 2022 16:47
> *To:* user 
> *Subject:* Regarding Queryable state in Flink
>
>
>
> Hi Team,
>
>
>
> We are currently running our streaming application based Flink(Datastream
> API ) on a non-prod cluster.And planning to move it to production cluster
> soon.. We are keeping cerating keyed state backed by rocksdb in the flink
> application. We need a mechanism to query these keyed state values for
> debugging and troubleshooting. Is it a good idea to use Queryable state for
> a single link-job running in application-mode on kubernetes for an average
> load of 10k events/second ?.
>
> Or is it a better idea to keep these state values in an external k,v store
> ?.
>
>
>
> So in short --> Is the queryable state stable enough to use in production
> systems ?
>
>
>
>
>
> Thanks
>
> Jessy
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


RE: Regarding Queryable state in Flink

2022-01-25 Thread Schwalbe Matthias
Hi Jessy,

Have you considered using the state processor api [1] for offline analysis of 
checkpoints and savepoints?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/

Sincere greetings

Thias


From: Jessy Ping 
Sent: Montag, 24. Januar 2022 16:47
To: user 
Subject: Regarding Queryable state in Flink

Hi Team,

We are currently running our streaming application based Flink(Datastream API ) 
on a non-prod cluster.And planning to move it to production cluster soon.. We 
are keeping cerating keyed state backed by rocksdb in the flink application. We 
need a mechanism to query these keyed state values for debugging and 
troubleshooting. Is it a good idea to use Queryable state for a single link-job 
running in application-mode on kubernetes for an average load of 10k 
events/second ?.
Or is it a better idea to keep these state values in an external k,v store ?.

So in short --> Is the queryable state stable enough to use in production 
systems ?


Thanks
Jessy
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: Upgrade to 1.14.3

2022-01-25 Thread Sweta Kalakuntla
Hi Ingo,

So basically, I cannot deploy an older version of flink job in 1.14.3 flink
cluster, is it?

Thanks,
Sweta

On Tue, Jan 25, 2022 at 4:02 AM Ingo Bürk  wrote:

> Hi Sweta,
>
> there was a non-compatible change to SourceReaderContext#metricGroup in
> the 1.14.x release line; I assume this is what you are seeing.
>
> Did you make sure to update the connector (and any other) dependencies
> as well?
>
>
> Best
> Ingo
>
> On 25.01.22 05:36, Sweta Kalakuntla wrote:
> > Hi,
> >
> > We are on flink 1.13.3 and trying to upgrade  the cluster to 1.14.3
> > version. I see that job(on 1.13.3) is unable to start up because it says
> > it couldn't find metrics group(inside flinkkafkaconsumer class).
> >
> > - can I deploy 1.13.3 job on 1.14.3 cluster?
> > - can I deploy 1.14.3 job on 1.13.3 cluster?
> > - How do I upgrade to 1.14.3 cluster without loosing running apps state?
> > I have even tried doing savepoint that did not revive the job.
> >
> > Thank you,
> > Sweta
>


Re: [statefun] client cert auth in remote function

2022-01-25 Thread Igal Shilman
Hi Fil,

If I understand correctly, you are looking for TLS client authentication,
i.e. the remote function needs to authenticate the request
that is coming from the StateFun runtime.
This is indeed not yet supported as it wasn't required by the community.
I'd be happy to create an issue and assign you to work on that if you are
still interested!

Cheers,
Igal.


On Mon, Jan 24, 2022 at 11:32 PM Filip Karnicki 
wrote:

> Cool, thanks! I'll speak to the shared cluster support team to see if they
> can install our CA cert on every box. So we've got that side of
> authentication sorted - flink can trust that the service is who it says it
> is.
>
> How about the other way around? Any thoughts on how I could provide a
> *key*store for the stateful functions job to use while calling remote
> function services with TLS? The remote function server (undertow in our
> case) needs to authenticate and authorise statefun based on the latter's
> cert.
>
> Many thanks
> Fil
>
> On Mon, 24 Jan 2022 at 21:25, Igal Shilman  wrote:
>
>> Hello Filip,
>>
>> As far as I know SslContextBuilder.forClient() should use the default
>> trust store, so if you will install your self signed certificate in the
>> community supported container image this should be picked up[1].
>> The following user has reported something similar, and it seems that
>> they've gone down a similar path [2].
>>
>> Cheers,
>> Igal.
>>
>> [1] https://stackoverflow.com/a/35304873/4405470
>> [2] https://lists.apache.org/thread/nxf7yk5ctcvndyygnvx7l34gldn0xgj3
>>
>>
>> On Mon, Jan 24, 2022 at 7:08 PM Filip Karnicki 
>> wrote:
>>
>>> Hi All!
>>>
>>> I was wondering if there's a way to secure a remote function by
>>> requiring the client (flink) to use a client cert. Preferably a base64
>>> encoded string from the env properties, but that might be asking for a lot
>>> :)
>>>
>>> I had a look at the code, and NettySharedResources seems to use
>>> SslContextBuilder.forClient(), and doesn't really seem to deal with setting
>>> any kind of a keystore
>>>
>>> Also, I don't think that setting
>>> -Djavax.net.ssl.trustStore=path/to/truststore.jks does anything, since I
>>> keep getting 'unable to find valid certification path to requested target',
>>> while an exported .pem from my tuststore works fine as a CA in postman
>>>
>>> I'm happy to contribute some code if need be, just point me in the right
>>> direction
>>>
>>> Kind regards,
>>> Fil
>>>
>>


Flink Connector类冲突问题

2022-01-25 Thread Ada Wong
目前多个Connector出现无法shade的类冲突是没办法解决的。
我提了个issue,里面写了大致的解决方案。有老哥再这块熟悉一点的嘛?可以再issue里讨论下。
https://issues.apache.org/jira/browse/FLINK-25804


Example for Jackson JsonNode Kafka serialization schema

2022-01-25 Thread Oran Shuster
In the documentation we have an example on how to implement deserialization
from bytes to Jackson ObjectNode objects - JSONKeyValueDeserializationSchema
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

However, there is no example on the other direction: Taking an
ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize
it to string

You can write a simple schema like so


public class JSONKafkaSerializationSchema implements
KafkaSerializationSchema {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public ProducerRecord serialize(JsonNode element,
@Nullable Long timestamp) {
String topic = getTargetTopic(element);

byte[] value;

try {
value = objectMapper.writeValueAsBytes(element);
return new ProducerRecord<>(topic, value);
} catch (JsonProcessingException e) {
return null;
}
}

private String getTargetTopic(JsonNode jsonNode) {
return jsonNode.get("topic").asText();
}
}

But this raises a question - What to do when a serialization fails?
if the input class is a simple POJO then Jackson should always succeed in
converting to bytes but that's not 100% guaranteed.
In case of failures, can we return null and the record will be discarded?
Null values are discarded in the case of the deserialization schema, from
the documentation - "Returns: The deserialized message as an object (null
if the message cannot be deserialized)."
If this is not possible, what is the proper way to serialize Jackson objets
into bytes in flink? Its possible to convert everything to String before
the kafka producer but then any logic to determine the topic we need to
send to will need to deserialize the string again


Re: ParquetColumnarRowInputFormat - parameter description

2022-01-25 Thread Fabian Paul
Hi Krzysztof,

sorry for the late reply. The community is very busy at the moment
with the final two weeks of Flink 1.15.

The parameters you have mentioned are mostly relevant for the internal
conversion or representation from Parquet types to Flink's SQL type
system.

- isUtcTimestamp denotes whether timestamps should be represented as
SQL UTC timestamps
- batchSize is an internal number of how many rows are put into one
vector. Vectors are used internally in Flink SQL for performance
reasons to enable faster execution on batches i.e. for Hive we use the
following default value [1]
- isCaseSensitive is used to map the field/column names from parquet
and match them to columns in Flink

I have also included @jingsongl...@gmail.com who is more familiar with
the parquet format.

Best,
Fabian

[1] 
https://github.com/apache/flink/blob/d8a031c2b7d7b73fe38a3f894913d3dcaa5a4111/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/VectorizedColumnBatch.java#L46

On Mon, Jan 24, 2022 at 4:32 PM Krzysztof Chmielewski
 wrote:
>
> Hi,
> I would like to bump this up a little bit.
>
> The isCaseSensitive  is rather clear. If this is false, then column read in 
> parquet file is case insensitive.
> batchSize - how many records we read from the Parquet file before passing it 
> to the upper classes right?
>
> Could someone describe what  timestamp flab does with some examples?
>
> Regards,
> Krzysztof Chmielewski
>
>
> pon., 10 sty 2022 o 14:59 Krzysztof Chmielewski 
>  napisał(a):
>>
>> Hi,
>> I would like to ask for some more details regarding three 
>> ParquetColumnarRowInputFormat contruction parameters.
>>
>> The parameters are:
>> batchSize,
>> isUtcTimestamp,
>> isCaseSensitive
>>
>> The parametr names gives some hint about their purpose but there is no 
>> description in docs (java, flink page).
>>
>> Could you provide me some information about the batching process and other 
>> two boolean flags?
>>
>> Regards,
>> Krzysztof Chmielewski


Re: TaskManager的Slot的释放时机

2022-01-25 Thread Zhilong Hong
Hello, johnjlong:

TaskExecutor#cancel是RPC调用,不包含TM是否存活的信息。TM是否存活是由Heartbeat
Service来负责检测的,目前heartbeat.timeout配置项 [1]
的默认值为50s。而RPC调用的超时配置项akka.ask.timeout [2]
的默认值为10s。如果想要尽快检测到TM丢失的情况,可以将这两个配置项的值调小,但这有可能会导致集群或作业不稳定。

关于降低heartbeat timeout时长社区目前已有讨论,具体可以参考:[3] 和 [4]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#heartbeat-timeout
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#akka-ask-timeout
[3] https://issues.apache.org/jira/browse/FLINK-23403
[4] https://issues.apache.org/jira/browse/FLINK-23209

Sincerely,
Zhilong

On Tue, Jan 25, 2022 at 10:06 AM johnjlong  wrote:

> 各位大佬好,请教一个问题。
> 我根据ResourceID主动释放TM的链接的时候,我发现TM对应的Slots仅仅是标记为free。
>
> 而其真正是释放却要等到JobMaster主动cancel整个ExecuteGraph的时候,此时会逐个调用每个定点所在的slot的TM的cancel方法。
> 但是此时相关联的TM已经close掉,触发了rpc超时,默认20s。然后slot才会被释放。
>
>
> 我的问题是:为什么不在调用TaskExecutor的cancelTask之间判断下TM是否存活,如果不存活就直接走cancel的流程,不用等rpc超时后,才进行下一步???
>
> 附上日志截图:
>
> johnjlong
> johnjl...@163.com
>
> 
> 签名由网易邮箱大师 定制
>


Re: Upgrade to 1.14.3

2022-01-25 Thread Ingo Bürk

Hi Sweta,

there was a non-compatible change to SourceReaderContext#metricGroup in 
the 1.14.x release line; I assume this is what you are seeing.


Did you make sure to update the connector (and any other) dependencies 
as well?



Best
Ingo

On 25.01.22 05:36, Sweta Kalakuntla wrote:

Hi,

We are on flink 1.13.3 and trying to upgrade  the cluster to 1.14.3 
version. I see that job(on 1.13.3) is unable to start up because it says 
it couldn't find metrics group(inside flinkkafkaconsumer class).


- can I deploy 1.13.3 job on 1.14.3 cluster?
- can I deploy 1.14.3 job on 1.13.3 cluster?
- How do I upgrade to 1.14.3 cluster without loosing running apps state? 
I have even tried doing savepoint that did not revive the job.


Thank you,
Sweta


Re: Flink Kinesis connector - EFO connection error with http proxy settings

2022-01-25 Thread Danny Cranmer
Hey Saravanan,

Please read the contribution guide [1]. It is a good idea to review the
code style guidelines [2] to reduce PR churn for nits.

If you can please raise a Jira and mention me, I will assign it to you.

[1] https://flink.apache.org/contributing/how-to-contribute.html
[2]
https://flink.apache.org/contributing/code-style-and-quality-preamble.html

Thanks,

On Sun, Jan 23, 2022 at 6:20 PM Gnanamoorthy, Saravanan <
saravanan.gnanamoor...@fmr.com> wrote:

> Hi Danny,
>
> I should be able to make the contribution to add proxy support. Please let
> me know the contribution process.
>
>
>
> Thanks
>
> -Saravan
>
>
>
> *From: *Danny Cranmer 
> *Date: *Wednesday, January 19, 2022 at 3:10 AM
> *To: *Gnanamoorthy, Saravanan 
> *Cc: *user@flink.apache.org 
> *Subject: *Re: Flink Kinesis connector - EFO connection error with http
> proxy settings
>
> *NOTICE: This email is from an external sender - do not click on links or
> attachments unless you recognize the sender and know the content is safe. *
>
>
>
> Hello Saravanan,
>
>
>
> Yes you are correct. EFO uses AWS SDK v2 and the builder does not set
> proxy configuration [1]. The polling (non EFO) mechanism is using AWS SDK
> v1 which has a more general configuration deserialiser, and hence proxy is
> configurable. I do not believe there is a workaround for this without
> modifying the connector.
>
>
>
> If you are in a position to make a contribution to add support, we would
> appreciate this. Otherwise I can take this one. Please let me know your
> thoughts.
>
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java#L113
>
>
>
> Thanks,
>
> Danny Cranmer.
>
>
>
> On Tue, Jan 18, 2022 at 12:52 AM Gnanamoorthy, Saravanan <
> saravanan.gnanamoor...@fmr.com> wrote:
>
> Hello,
>
> We are using Flink kinesis connector for processing the streaming data
> from kinesis. We are running the application behind the proxy. After the
> proxyhost and proxyport settings, the Connector works with default
> publisher type(Polling) but it doesn’t work when we enable the publisher
> type as Enhanced fanout (EFO). We tried with different connector version
> but it the behaviours is same. I am wondering if the proxy settings are
> ignored for EFO type. I am looking forward to your
> feedback/recommendations.
>
>
>
> Flink version: 1.3.5
>
> Java version: 11
>
>
>
> Here is the error log:
>
> 2022-01-17 18:59:20,707 WARN  org.apache.flink.runtime.taskmanager.Task
>   [] - Source: Custom Source -> Sink: Print to Std. Out
> (1/1)#0 (fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED
> with failure cause:
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException:
> Error registering stream: a367945-consumer-stream-dit
>
> at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125)
>
> at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106)
>
> at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75)
>
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:429)
>
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:365)
>
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536)
>
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>
> Suppressed: java.lang.NullPointerException
>
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843)
>
> at
> 

(??????)

2022-01-25 Thread ????????



917712...@qq.com