hbase NoSuchMethodError: org.apache.hadoop.hbase.client.HTable.getTableName()[B

2022-01-26 Thread 潘明文
HI 您好,


hbase-client 包是2.1.0 flink 1.12.4
hbase 代码如下:
hbase代码extends TableInputFormat>
try {
connection = ConnectionFactory.createConnection(hbaseConf);
//   Table table=connection.getTable(TableName.valueOf(tableName));
table = (HTable) connection.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
logger.error("HBase连接异常", e.getCause());
System.out.println("--");
}
   System.out.println("--aaa");
scan = new Scan().addFamily(Bytes.toBytes(family));
scan.withStartRow(startRow.getBytes());
scan.withStopRow(endRow.getBytes());
System.out.println("--");
错误如下:
 Exception in thread "main" org.apache.flink.util.FlinkException: Failed to 
execute job 'Flink Streaming Job'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1796)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765)
at com.example.app.hbasesource.main(hbasesource.java:25)
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not 
instantiate JobManager.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not instantiate JobManager.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: org.apache.hadoop.hbase.client.HTable.getTableName()[B
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:260)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:866)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:257)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:342)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:327)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
... 4 more
Caused by: java.lang.NoSuchMethodError: 
org.apache.hadoop.hbase.client.HTable.getTableName()[B
at 
org.apache.flink.addons.hbase.AbstractTableInputFormat.createInputSplits(AbstractTableInputFormat.java:232)
at 
org.apache.flink.addons.hbase.AbstractTableInputFormat.createInputSplits(AbstractTableInputFormat.java:44)
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:247)
... 18 more

Questions about checkpoint retention

2022-01-26 Thread Chen-Che Huang
Hi all,


To minimize the recovery time from failure, we employ incremental, retained
checkpoint with `state.checkpoints.num-retained

as 10` in our Flink apps. With this setting, Flink automatically creates
new checkpoints regularly and keeps only the latest 10

checkpoints. Besides, for app upgrade and better reliability, we have a
cron job which creates savepoints at regular intervals.



We have two questions for checkpoint retention.

   1. When our cron job creates a savepoint called SP, it seems those
   checkpoints created earlier SP still cannot be deleted. We thought the new
   checkpoints are generated based on SP and thus old checkpoints before SP
   will be useless. However, it seems the checkpoint mechanism doesn't work as
   we thought. Is what we thought correct?
   2. To save storage cost, we’d like to know what checkpoints can be
   deleted. Currently, each version of our app has 10 checkpoints. We wonder
   whether we can delete checkpoints generated for previous versions of our
   apps?


Any comment is appreciated!


Best wishes,

Chen-Che


An example is below. (checkpoint is generated every 30 mins while savepoint
is created every 2 hours)

1:00 Flink create checkpoint

1:30 Flink create checkpoint

2:00 Flink create checkpoint

2:30 Flink create checkpoint

3:00 Cronjob create savepoint (SP)

3:30 Flink create checkpoint

4:00 Flink create checkpoint

.

.

.


Re: Is it possible to support many different windows for many different keys?

2022-01-26 Thread Alexander Fedulov
>
> Again, thank you for your input.

You are welcome.

I want the stream element to define the window.

Got it, that was the missing bit of detail. That is also doable - not with
the Windows API, but with the more low level ProcessFunction.
Check out my blog post [1] , especially it's third part [2]. Windows
handling is that case is driven by external rules rather than by the
original events themselves, but this material should give you
enough inspiration to implement your required custom logic.

[1] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
[2] https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html

Best,
Alexander Fedulov

On Wed, Jan 26, 2022 at 10:45 PM Marco Villalobos 
wrote:

> Hi Alexander,
>
> Thank you for responding. The solution you proposed uses statically
> defined windows. What I need a are dynamically created windows determined
> by metadata in the stream element.
>
> I want the stream element to define the window.
>
> That’s what I’m trying to research, or an alternate solution.
>
> Again, thank you for your input.
>
> On Jan 26, 2022, at 1:32 PM, Alexander Fedulov 
> wrote:
>
> 
> Hi Marco,
>
> Not sure if I get your problem correctly, but you can process those
> windows on data "split" from the same input within the same Flink job.
> Something along these lines:
>
> DataStream stream = ...
> DataStream a = stream.filter( /* time series name == "a" */);
> a.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(1)));
>
> DataStream b = stream.filter( /* time series name == "b" */);
> b.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(5)));
>
> If needed, you can then union all of the separate results streams together.
> a.union(b, c ...);
>
> There is no need for separate Flink deployments to create such a pipeline.
>
> Best,
> Alexander Fedulov
>
> On Wed, Jan 26, 2022 at 6:47 PM Marco Villalobos <
> mvillalo...@kineteque.com> wrote:
>
>> Hi,
>>
>> I am working with time series data in the form of (timestamp, name,
>> value), and an event time that is the timestamp when the data was published
>> onto kafka, and I have a business requirement in which each stream element
>> becomes enriched, and then processing requires different time series names
>> to be processed in different windows with different time averages.
>>
>> For example, time series with name "a"
>>
>> might require a one minute window, and five minute window.
>>
>> time series with name "b" requires no windowing.
>>
>> time series with name "c" requires a two minute window and 10 minute
>> window.
>>
>> Does flink support this style of windowing?  I think it doesn't.  Also,
>> does any streaming platform support that type of windowing?
>>
>> I was thinking that this type of windowing support might require a
>> different flink deployment per each window.  Would that scale though, if
>> there are tens of thousands of time series names / windows?
>>
>> Any help or advice would be appreciated. Thank you.
>>
>> Marco A. Villalobos
>>
>>
>>


Re: Is it possible to support many different windows for many different keys?

2022-01-26 Thread Marco Villalobos
Hi Alexander,

Thank you for responding. The solution you proposed uses statically defined 
windows. What I need a are dynamically created windows determined by metadata 
in the stream element.

I want the stream element to define the window.

That’s what I’m trying to research, or an alternate solution.

Again, thank you for your input.

> On Jan 26, 2022, at 1:32 PM, Alexander Fedulov  
> wrote:
> 
> 
> Hi Marco,
> 
> Not sure if I get your problem correctly, but you can process those windows 
> on data "split" from the same input within the same Flink job.
> Something along these lines:
> 
> DataStream stream = ...
> DataStream a = stream.filter( /* time series name == "a" */);
> a.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(1)));
> 
> DataStream b = stream.filter( /* time series name == "b" */);
> b.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(5)));
> 
> If needed, you can then union all of the separate results streams together.
> a.union(b, c ...);
> 
> There is no need for separate Flink deployments to create such a pipeline.
> 
> Best,
> Alexander Fedulov
> 
>> On Wed, Jan 26, 2022 at 6:47 PM Marco Villalobos  
>> wrote:
>> Hi,
>> 
>> I am working with time series data in the form of (timestamp, name, value), 
>> and an event time that is the timestamp when the data was published onto 
>> kafka, and I have a business requirement in which each stream element 
>> becomes enriched, and then processing requires different time series names 
>> to be processed in different windows with different time averages.
>> 
>> For example, time series with name "a"
>> 
>> might require a one minute window, and five minute window.
>> 
>> time series with name "b" requires no windowing.
>> 
>> time series with name "c" requires a two minute window and 10 minute window.
>> 
>> Does flink support this style of windowing?  I think it doesn't.  Also, does 
>> any streaming platform support that type of windowing?
>> 
>> I was thinking that this type of windowing support might require a different 
>> flink deployment per each window.  Would that scale though, if there are 
>> tens of thousands of time series names / windows?
>> 
>> Any help or advice would be appreciated. Thank you.
>> 
>> Marco A. Villalobos
>> 
>> 


Re: Is it possible to support many different windows for many different keys?

2022-01-26 Thread Alexander Fedulov
Hi Marco,

Not sure if I get your problem correctly, but you can process those windows
on data "split" from the same input within the same Flink job.
Something along these lines:

DataStream stream = ...
DataStream a = stream.filter( /* time series name == "a" */);
a.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(1)));

DataStream b = stream.filter( /* time series name == "b" */);
b.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(5)));

If needed, you can then union all of the separate results streams together.
a.union(b, c ...);

There is no need for separate Flink deployments to create such a pipeline.

Best,
Alexander Fedulov

On Wed, Jan 26, 2022 at 6:47 PM Marco Villalobos 
wrote:

> Hi,
>
> I am working with time series data in the form of (timestamp, name,
> value), and an event time that is the timestamp when the data was published
> onto kafka, and I have a business requirement in which each stream element
> becomes enriched, and then processing requires different time series names
> to be processed in different windows with different time averages.
>
> For example, time series with name "a"
>
> might require a one minute window, and five minute window.
>
> time series with name "b" requires no windowing.
>
> time series with name "c" requires a two minute window and 10 minute
> window.
>
> Does flink support this style of windowing?  I think it doesn't.  Also,
> does any streaming platform support that type of windowing?
>
> I was thinking that this type of windowing support might require a
> different flink deployment per each window.  Would that scale though, if
> there are tens of thousands of time series names / windows?
>
> Any help or advice would be appreciated. Thank you.
>
> Marco A. Villalobos
>
>
>


Unbounded streaming with table API and large json as one of the columns

2022-01-26 Thread HG
Hi,

I need to calculate elapsed times between steps of a transaction.
Each step is an event. All steps belonging to a single transaction have the
same transaction id. Every event has a handling time.
All information is part of a large JSON structure.
But I can have the incoming source supply transactionId and handlingTime
separately.
That would save me retrieving the windowingKey = transactionID and
handlingTime out of the nested JSON
Basically I want to use the SQL api to do:

select transactionId
   , handlingTime - previousHandlingTime as elapsedTime
   , largeJSON from (
  select  transactionId
  , handlingTime
  , lag(handlingTime) over (partition by transactionID order by
handlingTime)  as previousHandlingTime
  , largeJSON
  from source
)

The largeJSON can be about 100K.
Would this work?

Regards Hans-Peter


Is it possible to support many different windows for many different keys?

2022-01-26 Thread Marco Villalobos
Hi,

I am working with time series data in the form of (timestamp, name, value),
and an event time that is the timestamp when the data was published onto
kafka, and I have a business requirement in which each stream element
becomes enriched, and then processing requires different time series names
to be processed in different windows with different time averages.

For example, time series with name "a"

might require a one minute window, and five minute window.

time series with name "b" requires no windowing.

time series with name "c" requires a two minute window and 10 minute window.

Does flink support this style of windowing?  I think it doesn't.  Also,
does any streaming platform support that type of windowing?

I was thinking that this type of windowing support might require a
different flink deployment per each window.  Would that scale though, if
there are tens of thousands of time series names / windows?

Any help or advice would be appreciated. Thank you.

Marco A. Villalobos


Re: Failure Restart Strategy leads to error

2022-01-26 Thread Siddhesh Kalgaonkar
Hi Yun and Oran,

Thanks for your time. Much appreciated!

Below are my configs:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)
env.enableCheckpointing(2000)

//env.setDefaultSavepointDirectory("file:home/siddhesh/Desktop/savepoints/")

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.setStateBackend(new
FsStateBackend(("file:home/siddhesh/Desktop/flink/")))
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after
which next checkpoint can be written.
env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have
to complete within 4secs
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1
checkpoints can be executed at a time
env.getCheckpointConfig.enableExternalizedCheckpoints(
  ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints
are retained if the job is cancelled explicitly
/*env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // max failures per unit
  org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), //time
interval for measuring failure rate
  org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) //
delay
))*/

Consumer properties:

val consumer = new FlinkKafkaConsumer[String]("topic_name", new
SimpleStringSchema(), getProperties()) // Setting up the consumer properties


def getProperties(): Properties = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("isolation.level", "read_committed")
return properties
  }

Also, you can see that I have commented out setting up my own savepoint
directory.  It was also leading to some error and was causing to end
abruptly. Do I need to set it up while running via CLI or is there is
something I am missing for failure restart strategy and savepoints
directory?

Thanks,
Sid


On Wed, Jan 26, 2022 at 1:52 PM Yun Tang  wrote:

> Hi Siddhesh,
>
> The root cause is that the configuration of group.id is missing for the
> Flink program. The configuration of restart strategy has no relationship
> with this.
>
> I think you should pay your attention to kafka related configurations.
>
>
> Best
> Yun Tang
> --
> *From:* Siddhesh Kalgaonkar 
> *Sent:* Wednesday, January 26, 2022 3:17
> *To:* user 
> *Subject:* Failure Restart Strategy leads to error
>
> I have Flink Kafka Consumer in place which works fine until I add the
> below lines:
>
> env.setRestartStrategy(RestartStrategies.failureRateRestart(
> 3,
> *// max failures per unit *Time.of(5, TimeUnit.MINUTES),
> *//time interval for measuring failure rate *Time.of(10, TimeUnit.SECONDS)
> *// delay *))
>
> It gives me the below error stack trace:
>
> DEBUG [flink-akka.actor.default-dispatcher-14] (JobMaster.java:1119) -
> Close ResourceManager connection 05d80aa9f3aca06faf7be80bbc8a0642.
> org.apache.flink.util.FlinkException: Stopping JobMaster for job Flink
> Kafka Example(b425ae91bfb0e81980b878b3e4392137).
> at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:400)
> at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> DEBUG [flink-akka.actor.default-dispatcher-12]
> (DefaultJobLeaderIdService.java:148) - Remove job
> b425ae91bfb0e81980b878b3e4392137 from job leader id monitoring.
>  INFO [flink-akka.actor.default-dispatcher-12] (ResourceManager.java:1047)
> - Disconnect job manager 
> a95c280817468866d08c3230ecd0462f@akka://flink/user/rpc/jobmanager_3
> 

Re: Reading performance - Kafka VS FileSystem

2022-01-26 Thread Yun Tang
Hi Jasmin,

>From my knowledge, it seems no big company would adopt pure file system source 
>as the main data source of Flink. We would in general choose a message queue, 
>e.g Kafka, as the data source.

Best
Yun Tang

From: Jasmin Redžepović 
Sent: Wednesday, January 26, 2022 23:13
To: user@flink.apache.org 
Subject: Re: Reading performance - Kafka VS FileSystem

Also, what would you recommend? I have both options available:

  *   Kafka - protobuf messages
  *   S3 - here are messages copied from kafka for persistence with Kafka 
Connect service

On 26.01.2022., at 14:43, Jasmin Redžepović 
mailto:jasmin.redzepo...@superbet.com>> wrote:

Hello Flink committers :)

Just one short question:
How is performance of reading from Kafka source compared to reading from 
FileSystem source? I would be very grateful if you could provide a short 
explanation.

I saw in documentation that both provide exactly-once semantics for streaming, 
but this sentence about FileSystem got me thinking about performance: “For any 
repeated enumeration, the SplitEnumerator filters out previously detected files 
and only sends new ones to the SourceReader.”  - is this filtering slowing down 
reading if there are more and more files?

p.s. I’m new to the Flink

Thanks for your help and Best regards,
Jasmin



This email is confidential and intended solely for the use of the individual or 
entity to whom it is addressed. If you received this e-mail by mistake, please 
notify the sender immediately by e-mail and delete this e-mail from your 
system. Please be informed that if you are not the intended recipient, you 
should not disseminate, distribute, disclose, copy or use this e-mail in any 
way, the act of dissemination, distribution, disclosure, copying or taking any 
action in reliance on the contents of this information being strictly 
prohibited. This e-mail is sent by a Superbet Group company. Any views 
expressed by the sender of this email are not necessarily those of Superbet 
Group. Please note that computer viruses can be transmitted by email. You are 
advised to check this email and any attachments for the presence of viruses. 
Superbet Group cannot accept any responsibility for any viruses transmitted by 
this email and/or any attachments.


Re: Reading performance - Kafka VS FileSystem

2022-01-26 Thread Jasmin Redžepović
Also, what would you recommend? I have both options available:

  *   Kafka - protobuf messages
  *   S3 - here are messages copied from kafka for persistence with Kafka 
Connect service

On 26.01.2022., at 14:43, Jasmin Redžepović 
mailto:jasmin.redzepo...@superbet.com>> wrote:

Hello Flink committers :)

Just one short question:
How is performance of reading from Kafka source compared to reading from 
FileSystem source? I would be very grateful if you could provide a short 
explanation.

I saw in documentation that both provide exactly-once semantics for streaming, 
but this sentence about FileSystem got me thinking about performance: “For any 
repeated enumeration, the SplitEnumerator filters out previously detected files 
and only sends new ones to the SourceReader.”  - is this filtering slowing down 
reading if there are more and more files?

p.s. I’m new to the Flink

Thanks for your help and Best regards,
Jasmin



This email is confidential and intended solely for the use of the individual or 
entity to whom it is addressed. If you received this e-mail by mistake, please 
notify the sender immediately by e-mail and delete this e-mail from your 
system. Please be informed that if you are not the intended recipient, you 
should not disseminate, distribute, disclose, copy or use this e-mail in any 
way, the act of dissemination, distribution, disclosure, copying or taking any 
action in reliance on the contents of this information being strictly 
prohibited. This e-mail is sent by a Superbet Group company. Any views 
expressed by the sender of this email are not necessarily those of Superbet 
Group. Please note that computer viruses can be transmitted by email. You are 
advised to check this email and any attachments for the presence of viruses. 
Superbet Group cannot accept any responsibility for any viruses transmitted by 
this email and/or any attachments.


Re: create savepoint on bounded source in streaming mode

2022-01-26 Thread Shawn Du
DataStream API

--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 21:51
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi, Shawn

Thank you for your sharing. Unfortunately I do not think there is an easy way 
to achieve this now.
Actually we have a customer who has the same requirement but the scenario is a 
little different. The bounded and unbounded pipeline have some differences but 
the customer wants reuse some state of the bounded pipeline.
Another question is what the api does the pipelined use? DataStream or SQL

Best,
Guowei

On Wed, Jan 26, 2022 at 8:58 PM Shawn Du  wrote:
   right!
--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 19:50
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi,Shawn

You want to use the correct state(n-1) for day n-1 and the full amount of data 
for day n to produce the correct state(n) for day n.
 Then use state(n) to initialize a job to process the data for day n+1.
 Am I understanding this correctly?

Best,
Guowei

Shawn Du 于2022年1月26日 周三下午7:15写道:
Hi Gaowei,

think the case:
we have one streaming application built by flink, but kinds of reason, 
the event may be disordered or delayed terribly.
we want to replay the data day by day(the data was processed like 
reordered.). it looks like a batching job but with state. we want to use the 
same code for replaying.
thus we need persist the state for next job. any ideas?

Thanks
Shawn


--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 15:39
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi Shawn
Currently Flink can not trigger the sp at the end of the input. An alternative 
way might be that you need to develop a customized source, which triggers a 
savepoint when it notices that all the input split has been handled.
Or you could see the state process api[1], which might be helpful.

Thanks for your sharing but I have another little question:
I think you need to process all the historical events to rebuild the correct 
state. So there might be no gain even if you periodically create a savepoint. 
So why did you need to "rebuild" the state periodically? Do I miss something?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

Best,
Guowei

On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:

   our application is stateful. processing live events depends on the 
state. but for kinds of reason, we need rebuild the state. it will be very 
costly to replay all data.
   our historical events data are stored in s3. so we want to create 
states/savepoints periodically so that we can rebuild the state from a point. 
we call this as a bootstrap process.
   any ideas?

   Thanks.
--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 14:04
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi, Shawn
I think Flink does not support this mechanism yet. 
Would you like to share the scenario in which you need this savepoint at the 
end of the bounded input?
Best,
Guowei

On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
Hi experts,

assume I have several files and I want replay these files in order in streaming 
mode and create a savepoint when files play at the end. it is possible?
I wrote a simple test app, and job are finished when source is at the end. I 
have no chance to creat a savepoint. please help.

Thanks
Shawn


-- 
Best,
Guowei




Re: create savepoint on bounded source in streaming mode

2022-01-26 Thread Guowei Ma
Hi, Shawn

Thank you for your sharing. Unfortunately I do not think there is an easy
way to achieve this now.
Actually we have a customer who has the same requirement but the scenario
is a little different. The bounded and unbounded pipeline have some
differences but the customer wants reuse some state of the bounded pipeline.
Another question is what the api does the pipelined use? DataStream or SQL

Best,
Guowei


On Wed, Jan 26, 2022 at 8:58 PM Shawn Du  wrote:

>right!
>
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 19:50
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi,Shawn
>
> You want to use the correct state(n-1) for day n-1 and the full amount of
> data for day n to produce the correct state(n) for day n.
>  Then use state(n) to initialize a job to process the data for day n+1.
>  Am I understanding this correctly?
>
> Best,
> Guowei
>
> Shawn Du 于2022年1月26日 周三下午7:15写道:
> Hi Gaowei,
>
> think the case:
> we have one streaming application built by flink, but kinds of
> reason, the event may be disordered or delayed terribly.
> we want to replay the data day by day(the data was processed like
> reordered.). it looks like a batching job but with state. we want to use
> the same code for replaying.
> thus we need persist the state for next job. any ideas?
>
> Thanks
> Shawn
>
>
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 15:39
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi Shawn
> Currently Flink can not trigger the sp at the end of the input. An
> alternative way might be that you need to develop a customized source,
> which triggers a savepoint when it notices that all the input split has
> been handled.
> Or you could see the state process api[1], which might be helpful.
>
> Thanks for your sharing but I have another little question:
> I think you need to process all the historical events to rebuild the
> correct state. So there might be no gain even if you periodically create a
> savepoint. So why did you need to "rebuild" the state periodically? Do I
> miss something?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/
>
> Best,
> Guowei
>
>
> On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:
>
>our application is stateful. processing live events depends on the
> state. but for kinds of reason, we need rebuild the state. it will be very
> costly to replay all data.
>our historical events data are stored in s3. so we want to create
> states/savepoints periodically so that we can rebuild the state from a
> point. we call this as a bootstrap process.
>any ideas?
>
>Thanks.
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 14:04
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi, Shawn
> I think Flink does not support this mechanism yet.
> Would you like to share the scenario in which you need this savepoint at
> the end of the bounded input?
> Best,
> Guowei
>
>
> On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
> Hi experts,
>
> assume I have several files and I want replay these files in order in
> streaming mode and create a savepoint when files play at the end. it is
> possible?
> I wrote a simple test app, and job are finished when source is at the end.
> I have no chance to creat a savepoint. please help.
>
> Thanks
> Shawn
>
>
> --
> Best,
> Guowei
>
>
>


Reading performance - Kafka VS FileSystem

2022-01-26 Thread Jasmin Redžepović
Hello Flink committers :)

Just one short question:
How is performance of reading from Kafka source compared to reading from 
FileSystem source? I would be very grateful if you could provide a short 
explanation.

I saw in documentation that both provide exactly-once semantics for streaming, 
but this sentence about FileSystem got me thinking about performance: “For any 
repeated enumeration, the SplitEnumerator filters out previously detected files 
and only sends new ones to the SourceReader.”  - is this filtering slowing down 
reading if there are more and more files?

p.s. I’m new to the Flink

Thanks for your help and Best regards,
Jasmin


This email is confidential and intended solely for the use of the individual or 
entity to whom it is addressed. If you received this e-mail by mistake, please 
notify the sender immediately by e-mail and delete this e-mail from your 
system. Please be informed that if you are not the intended recipient, you 
should not disseminate, distribute, disclose, copy or use this e-mail in any 
way, the act of dissemination, distribution, disclosure, copying or taking any 
action in reliance on the contents of this information being strictly 
prohibited. This e-mail is sent by a Superbet Group company. Any views 
expressed by the sender of this email are not necessarily those of Superbet 
Group. Please note that computer viruses can be transmitted by email. You are 
advised to check this email and any attachments for the presence of viruses. 
Superbet Group cannot accept any responsibility for any viruses transmitted by 
this email and/or any attachments.


Re: create savepoint on bounded source in streaming mode

2022-01-26 Thread Shawn Du
   right!
--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 19:50
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi,Shawn

You want to use the correct state(n-1) for day n-1 and the full amount of data 
for day n to produce the correct state(n) for day n.
 Then use state(n) to initialize a job to process the data for day n+1.
 Am I understanding this correctly?

Best,
Guowei

Shawn Du 于2022年1月26日 周三下午7:15写道:
Hi Gaowei,

think the case:
we have one streaming application built by flink, but kinds of reason, 
the event may be disordered or delayed terribly.
we want to replay the data day by day(the data was processed like 
reordered.). it looks like a batching job but with state. we want to use the 
same code for replaying.
thus we need persist the state for next job. any ideas?

Thanks
Shawn


--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 15:39
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi Shawn
Currently Flink can not trigger the sp at the end of the input. An alternative 
way might be that you need to develop a customized source, which triggers a 
savepoint when it notices that all the input split has been handled.
Or you could see the state process api[1], which might be helpful.

Thanks for your sharing but I have another little question:
I think you need to process all the historical events to rebuild the correct 
state. So there might be no gain even if you periodically create a savepoint. 
So why did you need to "rebuild" the state periodically? Do I miss something?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

Best,
Guowei

On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:

   our application is stateful. processing live events depends on the 
state. but for kinds of reason, we need rebuild the state. it will be very 
costly to replay all data.
   our historical events data are stored in s3. so we want to create 
states/savepoints periodically so that we can rebuild the state from a point. 
we call this as a bootstrap process.
   any ideas?

   Thanks.
--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 14:04
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi, Shawn
I think Flink does not support this mechanism yet. 
Would you like to share the scenario in which you need this savepoint at the 
end of the bounded input?
Best,
Guowei

On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
Hi experts,

assume I have several files and I want replay these files in order in streaming 
mode and create a savepoint when files play at the end. it is possible?
I wrote a simple test app, and job are finished when source is at the end. I 
have no chance to creat a savepoint. please help.

Thanks
Shawn


-- 
Best,
Guowei



Re: create savepoint on bounded source in streaming mode

2022-01-26 Thread Guowei Ma
Hi,Shawn

You want to use the correct state(n-1) for day n-1 and the full amount of
data for day n to produce the correct state(n) for day n.
 Then use state(n) to initialize a job to process the data for day n+1.
 Am I understanding this correctly?

Best,
Guowei

Shawn Du 于2022年1月26日 周三下午7:15写道:

> Hi Gaowei,
>
> think the case:
> we have one streaming application built by flink, but kinds of
> reason, the event may be disordered or delayed terribly.
> we want to replay the data day by day(the data was processed like
> reordered.). it looks like a batching job but with state. we want to use
> the same code for replaying.
> thus we need persist the state for next job. any ideas?
>
> Thanks
> Shawn
>
>
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 15:39
>
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi Shawn
> Currently Flink can not trigger the sp at the end of the input. An
> alternative way might be that you need to develop a customized source,
> which triggers a savepoint when it notices that all the input split has
> been handled.
> Or you could see the state process api[1], which might be helpful.
>
> Thanks for your sharing but I have another little question:
> I think you need to process all the historical events to rebuild the
> correct state. So there might be no gain even if you periodically create a
> savepoint. So why did you need to "rebuild" the state periodically? Do I
> miss something?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/
>
> Best,
> Guowei
>
>
> On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:
>
>our application is stateful. processing live events depends on the
> state. but for kinds of reason, we need rebuild the state. it will be very
> costly to replay all data.
>our historical events data are stored in s3. so we want to create
> states/savepoints periodically so that we can rebuild the state from a
> point. we call this as a bootstrap process.
>any ideas?
>
>Thanks.
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 14:04
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi, Shawn
> I think Flink does not support this mechanism yet.
> Would you like to share the scenario in which you need this savepoint at
> the end of the bounded input?
> Best,
> Guowei
>
>
> On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
> Hi experts,
>
> assume I have several files and I want replay these files in order in
> streaming mode and create a savepoint when files play at the end. it is
> possible?
> I wrote a simple test app, and job are finished when source is at the end.
> I have no chance to creat a savepoint. please help.
>
> Thanks
> Shawn
>
>
> --
Best,
Guowei


Resolving a CatalogTable

2022-01-26 Thread Balázs Varga
Hi everyone,

I'm trying to migrate from the old set of CatalogTable related APIs
(CatalogTableImpl, TableSchema, DescriptorProperties) to the new ones
(CatalogBaseTable, Schema and ResolvedSchema, CatalogPropertiesUtil), in a
custom catalog.

The catalog stores table definitions, and the current logic involves
persisting the
schema from a CatalogBaseTable to a database. When we get a table, its
definition is read from the database and the CatalogTable is built up and
returned.

For this, we currently serialize the schema like this:
descriptorProperties.putTableSchema(Schema.SCHEMA,
catalogBaseTable.getSchema());

The new API seems to intentionally only allow the serialization of the
Resolved version of objects (e.g. ResolvedCatalogTable, ResolvedSchema).

1. Could you please clarify why this limitation was put into place? It
seems to me that it would
be sufficient to resolve the CatalogTables once we are actually trying to
pass the table to the DynamicTableFactory.

2. What additional information is gained during the resolution of a
CatalogTable, and where does that information come from? Are there some
references to things in other catalogs?

3. Is it possible to "manually" resolve a CatalogTable? (invoke something
like what the internal DefaultSchemaResolver does). What context is
required?

Thanks,
Balazs


Re: create savepoint on bounded source in streaming mode

2022-01-26 Thread Shawn Du
Hi Gaowei,

think the case:
we have one streaming application built by flink, but kinds of reason, 
the event may be disordered or delayed terribly.
we want to replay the data day by day(the data was processed like 
reordered.). it looks like a batching job but with state. we want to use the 
same code for replaying.
thus we need persist the state for next job. any ideas?

Thanks
Shawn



--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 15:39
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi Shawn
Currently Flink can not trigger the sp at the end of the input. An alternative 
way might be that you need to develop a customized source, which triggers a 
savepoint when it notices that all the input split has been handled.
Or you could see the state process api[1], which might be helpful.

Thanks for your sharing but I have another little question:
I think you need to process all the historical events to rebuild the correct 
state. So there might be no gain even if you periodically create a savepoint. 
So why did you need to "rebuild" the state periodically? Do I miss something?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

Best,
Guowei

On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:

   our application is stateful. processing live events depends on the 
state. but for kinds of reason, we need rebuild the state. it will be very 
costly to replay all data.
   our historical events data are stored in s3. so we want to create 
states/savepoints periodically so that we can rebuild the state from a point. 
we call this as a bootstrap process.
   any ideas?

   Thanks.
--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 14:04
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi, Shawn
I think Flink does not support this mechanism yet. 
Would you like to share the scenario in which you need this savepoint at the 
end of the bounded input?
Best,
Guowei

On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
Hi experts,

assume I have several files and I want replay these files in order in streaming 
mode and create a savepoint when files play at the end. it is possible?
I wrote a simple test app, and job are finished when source is at the end. I 
have no chance to creat a savepoint. please help.

Thanks
Shawn




Re: How to run in IDE?

2022-01-26 Thread Chesnay Schepler
We will need more of the logs contents to help you (preferably the whole 
thing.


On 25/01/2022 23:55, John Smith wrote:
I'm using: final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


But no go.

On Mon, 24 Jan 2022 at 16:35, John Smith  wrote:

Hi using Flink 1.14.3 with gradle. I explicitly added the flink
client dependency and the job starts but it quits with...

In Flink 1.10 the job worked as is. How do I set the number of
slots and is there any other settings for the IDE?

16:29:50,633 INFO
 org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
 - Received resource requirements from job
3a3e9c46da413071392bce161c39270f:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=2}]
16:29:50,633 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph    - Sink:
Print to Std. Out (14/16) (d7c4fbf5f23f3118e54998f2b35338c1)
switched from CANCELING to CANCELED.



Flink-ML: Sink model data in online training

2022-01-26 Thread thekingofcity
Hi,

I want sink the model data (coefficient from the logsitic regression model in 
my case) from the flink.ml.api.Model to print or file. I figure out the way to 
sink it in the batch training mode but face the following exception when the 
Estimator takes an UNBOUNDED datastream.

```

Caused by: java.lang.IllegalStateException: There can be only a single consumer 
in a FeedbackChannel.
at 
org.apache.flink.statefun.flink.core.feedback.FeedbackChannel.registerConsumer(FeedbackChannel.java:79)
```

This will happend if I dump it through the Table API like this:

```
final TableDescriptor sinkDescriptor = TableDescriptor
.forConnector("print")
.schema(Schema
.newBuilder()
.column("coefficient", DataTypes.of(new DenseVectorTypeInfo()))
.build()
).build();
tEnv.createTemporaryTable("ModelSink", sinkDescriptor);
model.getModelData()[0].executeInsert("ModelSink");
```

Looking for an example that can sink the model data in online training mode.

With many thanks,
thekingofcity

flinkcdc 2.1????

2022-01-26 Thread ??????
??flinkcdc 2.1.1

Re: Failure Restart Strategy leads to error

2022-01-26 Thread Yun Tang
Hi Siddhesh,

The root cause is that the configuration of group.id is missing for the Flink 
program. The configuration of restart strategy has no relationship with this.

I think you should pay your attention to kafka related configurations.


Best
Yun Tang

From: Siddhesh Kalgaonkar 
Sent: Wednesday, January 26, 2022 3:17
To: user 
Subject: Failure Restart Strategy leads to error

I have Flink Kafka Consumer in place which works fine until I add the below 
lines:

env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
))

It gives me the below error stack trace:

DEBUG [flink-akka.actor.default-dispatcher-14] (JobMaster.java:1119) - Close 
ResourceManager connection 05d80aa9f3aca06faf7be80bbc8a0642.
org.apache.flink.util.FlinkException: Stopping JobMaster for job Flink Kafka 
Example(b425ae91bfb0e81980b878b3e4392137).
at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:400)
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
DEBUG [flink-akka.actor.default-dispatcher-12] 
(DefaultJobLeaderIdService.java:148) - Remove job 
b425ae91bfb0e81980b878b3e4392137 from job leader id monitoring.
 INFO [flink-akka.actor.default-dispatcher-12] (ResourceManager.java:1047) - 
Disconnect job manager 
a95c280817468866d08c3230ecd0462f@akka://flink/user/rpc/jobmanager_3 for job 
b425ae91bfb0e81980b878b3e4392137 from the resource manager.
DEBUG [flink-akka.actor.default-dispatcher-12] (DefaultResourceTracker.java:80) 
- Initiating tracking of resources for job b425ae91bfb0e81980b878b3e4392137.
DEBUG [flink-akka.actor.default-dispatcher-12] (DefaultResourceTracker.java:60) 
- Stopping tracking of resources for job b425ae91bfb0e81980b878b3e4392137.
DEBUG [flink-akka.actor.default-dispatcher-14] (AkkaRpcActor.java:131) - The 
RpcEndpoint jobmanager_3 terminated successfully.
 INFO [flink-akka.actor.default-dispatcher-8] 
(DefaultJobLeaderService.java:136) - Stop job leader service.
 INFO [flink-akka.actor.default-dispatcher-8] 
(TaskExecutorLocalStateStoresManager.java:231) - Shutting down 
TaskExecutorLocalStateStoresManager.
DEBUG [flink-akka.actor.default-dispatcher-8] (IOManagerAsync.java:121) - 
Shutting down I/O manager.
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at 

Re: create savepoint on bounded source in streaming mode

2022-01-26 Thread Shawn Du
   cool! HybridSource seems much close to my requirements. 
   Thanks Dawid.  I will have a try.
 
   Shawn
--
Sender:Dawid Wysakowicz 
Sent At:2022 Jan. 26 (Wed.) 15:49
Recipient:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi Shawn,
You could also take a look at the hybrid source[1]
Best,
Dawid
[1]https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
On 26/01/2022 08:39, Guowei Ma wrote:
Hi Shawn 
Currently Flink can not trigger the sp at the end of the input. An alternative 
way might be that you need to develop a customized source, which triggers a 
savepoint when it notices that all the input split has been handled.
Or you could see the state process api[1], which might be helpful.

Thanks for your sharing but I have another little question:
I think you need to process all the historical events to rebuild the correct 
state. So there might be no gain even if you periodically create a savepoint. 
So why did you need to "rebuild" the state periodically? Do I miss something?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

Best,
Guowei 

On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:
 
   our application is stateful. processing live events depends on the 
state. but for kinds of reason, we need rebuild the state. it will be very 
costly to replay all data.
   our historical events data are stored in s3. so we want to create 
states/savepoints periodically so that we can rebuild the state from a point. 
we call this as a bootstrap process.
   any ideas?

   Thanks.
--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 14:04
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi, Shawn 
I think Flink does not support this mechanism yet. 
Would you like to share the scenario in which you need this savepoint at the 
end of the bounded input?
Best, 
Guowei  

On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
Hi experts,

assume I have several files and I want replay these files in order in streaming 
mode and create a savepoint when files play at the end. it is possible?
I wrote a simple test app, and job are finished when source is at the end. I 
have no chance to creat a savepoint. please help.

Thanks
Shawn