hbase NoSuchMethodError: org.apache.hadoop.hbase.client.HTable.getTableName()[B
HI 您好, hbase-client 包是2.1.0 flink 1.12.4 hbase 代码如下: hbase代码extends TableInputFormat> try { connection = ConnectionFactory.createConnection(hbaseConf); // Table table=connection.getTable(TableName.valueOf(tableName)); table = (HTable) connection.getTable(TableName.valueOf(tableName)); } catch (IOException e) { logger.error("HBase连接异常", e.getCause()); System.out.println("--"); } System.out.println("--aaa"); scan = new Scan().addFamily(Bytes.toBytes(family)); scan.withStartRow(startRow.getBytes()); scan.withStopRow(endRow.getBytes()); System.out.println("--"); 错误如下: Exception in thread "main" org.apache.flink.util.FlinkException: Failed to execute job 'Flink Streaming Job'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1796) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765) at com.example.app.hbasesource.main(hbasesource.java:25) Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: org.apache.hadoop.hbase.client.HTable.getTableName()[B at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:260) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:866) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:257) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276) at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249) at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:342) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:327) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478) ... 4 more Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.HTable.getTableName()[B at org.apache.flink.addons.hbase.AbstractTableInputFormat.createInputSplits(AbstractTableInputFormat.java:232) at org.apache.flink.addons.hbase.AbstractTableInputFormat.createInputSplits(AbstractTableInputFormat.java:44) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:247) ... 18 more
Questions about checkpoint retention
Hi all, To minimize the recovery time from failure, we employ incremental, retained checkpoint with `state.checkpoints.num-retained as 10` in our Flink apps. With this setting, Flink automatically creates new checkpoints regularly and keeps only the latest 10 checkpoints. Besides, for app upgrade and better reliability, we have a cron job which creates savepoints at regular intervals. We have two questions for checkpoint retention. 1. When our cron job creates a savepoint called SP, it seems those checkpoints created earlier SP still cannot be deleted. We thought the new checkpoints are generated based on SP and thus old checkpoints before SP will be useless. However, it seems the checkpoint mechanism doesn't work as we thought. Is what we thought correct? 2. To save storage cost, we’d like to know what checkpoints can be deleted. Currently, each version of our app has 10 checkpoints. We wonder whether we can delete checkpoints generated for previous versions of our apps? Any comment is appreciated! Best wishes, Chen-Che An example is below. (checkpoint is generated every 30 mins while savepoint is created every 2 hours) 1:00 Flink create checkpoint 1:30 Flink create checkpoint 2:00 Flink create checkpoint 2:30 Flink create checkpoint 3:00 Cronjob create savepoint (SP) 3:30 Flink create checkpoint 4:00 Flink create checkpoint . . .
Re: Is it possible to support many different windows for many different keys?
> > Again, thank you for your input. You are welcome. I want the stream element to define the window. Got it, that was the missing bit of detail. That is also doable - not with the Windows API, but with the more low level ProcessFunction. Check out my blog post [1] , especially it's third part [2]. Windows handling is that case is driven by external rules rather than by the original events themselves, but this material should give you enough inspiration to implement your required custom logic. [1] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html [2] https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html Best, Alexander Fedulov On Wed, Jan 26, 2022 at 10:45 PM Marco Villalobos wrote: > Hi Alexander, > > Thank you for responding. The solution you proposed uses statically > defined windows. What I need a are dynamically created windows determined > by metadata in the stream element. > > I want the stream element to define the window. > > That’s what I’m trying to research, or an alternate solution. > > Again, thank you for your input. > > On Jan 26, 2022, at 1:32 PM, Alexander Fedulov > wrote: > > > Hi Marco, > > Not sure if I get your problem correctly, but you can process those > windows on data "split" from the same input within the same Flink job. > Something along these lines: > > DataStream stream = ... > DataStream a = stream.filter( /* time series name == "a" */); > a.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(1))); > > DataStream b = stream.filter( /* time series name == "b" */); > b.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(5))); > > If needed, you can then union all of the separate results streams together. > a.union(b, c ...); > > There is no need for separate Flink deployments to create such a pipeline. > > Best, > Alexander Fedulov > > On Wed, Jan 26, 2022 at 6:47 PM Marco Villalobos < > mvillalo...@kineteque.com> wrote: > >> Hi, >> >> I am working with time series data in the form of (timestamp, name, >> value), and an event time that is the timestamp when the data was published >> onto kafka, and I have a business requirement in which each stream element >> becomes enriched, and then processing requires different time series names >> to be processed in different windows with different time averages. >> >> For example, time series with name "a" >> >> might require a one minute window, and five minute window. >> >> time series with name "b" requires no windowing. >> >> time series with name "c" requires a two minute window and 10 minute >> window. >> >> Does flink support this style of windowing? I think it doesn't. Also, >> does any streaming platform support that type of windowing? >> >> I was thinking that this type of windowing support might require a >> different flink deployment per each window. Would that scale though, if >> there are tens of thousands of time series names / windows? >> >> Any help or advice would be appreciated. Thank you. >> >> Marco A. Villalobos >> >> >>
Re: Is it possible to support many different windows for many different keys?
Hi Alexander, Thank you for responding. The solution you proposed uses statically defined windows. What I need a are dynamically created windows determined by metadata in the stream element. I want the stream element to define the window. That’s what I’m trying to research, or an alternate solution. Again, thank you for your input. > On Jan 26, 2022, at 1:32 PM, Alexander Fedulov > wrote: > > > Hi Marco, > > Not sure if I get your problem correctly, but you can process those windows > on data "split" from the same input within the same Flink job. > Something along these lines: > > DataStream stream = ... > DataStream a = stream.filter( /* time series name == "a" */); > a.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(1))); > > DataStream b = stream.filter( /* time series name == "b" */); > b.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(5))); > > If needed, you can then union all of the separate results streams together. > a.union(b, c ...); > > There is no need for separate Flink deployments to create such a pipeline. > > Best, > Alexander Fedulov > >> On Wed, Jan 26, 2022 at 6:47 PM Marco Villalobos >> wrote: >> Hi, >> >> I am working with time series data in the form of (timestamp, name, value), >> and an event time that is the timestamp when the data was published onto >> kafka, and I have a business requirement in which each stream element >> becomes enriched, and then processing requires different time series names >> to be processed in different windows with different time averages. >> >> For example, time series with name "a" >> >> might require a one minute window, and five minute window. >> >> time series with name "b" requires no windowing. >> >> time series with name "c" requires a two minute window and 10 minute window. >> >> Does flink support this style of windowing? I think it doesn't. Also, does >> any streaming platform support that type of windowing? >> >> I was thinking that this type of windowing support might require a different >> flink deployment per each window. Would that scale though, if there are >> tens of thousands of time series names / windows? >> >> Any help or advice would be appreciated. Thank you. >> >> Marco A. Villalobos >> >>
Re: Is it possible to support many different windows for many different keys?
Hi Marco, Not sure if I get your problem correctly, but you can process those windows on data "split" from the same input within the same Flink job. Something along these lines: DataStream stream = ... DataStream a = stream.filter( /* time series name == "a" */); a.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(1))); DataStream b = stream.filter( /* time series name == "b" */); b.keyBy(...).window(TumblingEventTimeWindows.of(Time.minutes(5))); If needed, you can then union all of the separate results streams together. a.union(b, c ...); There is no need for separate Flink deployments to create such a pipeline. Best, Alexander Fedulov On Wed, Jan 26, 2022 at 6:47 PM Marco Villalobos wrote: > Hi, > > I am working with time series data in the form of (timestamp, name, > value), and an event time that is the timestamp when the data was published > onto kafka, and I have a business requirement in which each stream element > becomes enriched, and then processing requires different time series names > to be processed in different windows with different time averages. > > For example, time series with name "a" > > might require a one minute window, and five minute window. > > time series with name "b" requires no windowing. > > time series with name "c" requires a two minute window and 10 minute > window. > > Does flink support this style of windowing? I think it doesn't. Also, > does any streaming platform support that type of windowing? > > I was thinking that this type of windowing support might require a > different flink deployment per each window. Would that scale though, if > there are tens of thousands of time series names / windows? > > Any help or advice would be appreciated. Thank you. > > Marco A. Villalobos > > >
Unbounded streaming with table API and large json as one of the columns
Hi, I need to calculate elapsed times between steps of a transaction. Each step is an event. All steps belonging to a single transaction have the same transaction id. Every event has a handling time. All information is part of a large JSON structure. But I can have the incoming source supply transactionId and handlingTime separately. That would save me retrieving the windowingKey = transactionID and handlingTime out of the nested JSON Basically I want to use the SQL api to do: select transactionId , handlingTime - previousHandlingTime as elapsedTime , largeJSON from ( select transactionId , handlingTime , lag(handlingTime) over (partition by transactionID order by handlingTime) as previousHandlingTime , largeJSON from source ) The largeJSON can be about 100K. Would this work? Regards Hans-Peter
Is it possible to support many different windows for many different keys?
Hi, I am working with time series data in the form of (timestamp, name, value), and an event time that is the timestamp when the data was published onto kafka, and I have a business requirement in which each stream element becomes enriched, and then processing requires different time series names to be processed in different windows with different time averages. For example, time series with name "a" might require a one minute window, and five minute window. time series with name "b" requires no windowing. time series with name "c" requires a two minute window and 10 minute window. Does flink support this style of windowing? I think it doesn't. Also, does any streaming platform support that type of windowing? I was thinking that this type of windowing support might require a different flink deployment per each window. Would that scale though, if there are tens of thousands of time series names / windows? Any help or advice would be appreciated. Thank you. Marco A. Villalobos
Re: Failure Restart Strategy leads to error
Hi Yun and Oran, Thanks for your time. Much appreciated! Below are my configs: val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.enableCheckpointing(2000) //env.setDefaultSavepointDirectory("file:home/siddhesh/Desktop/savepoints/") env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.setStateBackend(new FsStateBackend(("file:home/siddhesh/Desktop/flink/"))) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written. env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly /*env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per unit org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delay ))*/ Consumer properties: val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), getProperties()) // Setting up the consumer properties def getProperties(): Properties = { val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("isolation.level", "read_committed") return properties } Also, you can see that I have commented out setting up my own savepoint directory. It was also leading to some error and was causing to end abruptly. Do I need to set it up while running via CLI or is there is something I am missing for failure restart strategy and savepoints directory? Thanks, Sid On Wed, Jan 26, 2022 at 1:52 PM Yun Tang wrote: > Hi Siddhesh, > > The root cause is that the configuration of group.id is missing for the > Flink program. The configuration of restart strategy has no relationship > with this. > > I think you should pay your attention to kafka related configurations. > > > Best > Yun Tang > -- > *From:* Siddhesh Kalgaonkar > *Sent:* Wednesday, January 26, 2022 3:17 > *To:* user > *Subject:* Failure Restart Strategy leads to error > > I have Flink Kafka Consumer in place which works fine until I add the > below lines: > > env.setRestartStrategy(RestartStrategies.failureRateRestart( > 3, > *// max failures per unit *Time.of(5, TimeUnit.MINUTES), > *//time interval for measuring failure rate *Time.of(10, TimeUnit.SECONDS) > *// delay *)) > > It gives me the below error stack trace: > > DEBUG [flink-akka.actor.default-dispatcher-14] (JobMaster.java:1119) - > Close ResourceManager connection 05d80aa9f3aca06faf7be80bbc8a0642. > org.apache.flink.util.FlinkException: Stopping JobMaster for job Flink > Kafka Example(b425ae91bfb0e81980b878b3e4392137). > at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:400) > at > org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > DEBUG [flink-akka.actor.default-dispatcher-12] > (DefaultJobLeaderIdService.java:148) - Remove job > b425ae91bfb0e81980b878b3e4392137 from job leader id monitoring. > INFO [flink-akka.actor.default-dispatcher-12] (ResourceManager.java:1047) > - Disconnect job manager > a95c280817468866d08c3230ecd0462f@akka://flink/user/rpc/jobmanager_3 >
Re: Reading performance - Kafka VS FileSystem
Hi Jasmin, >From my knowledge, it seems no big company would adopt pure file system source >as the main data source of Flink. We would in general choose a message queue, >e.g Kafka, as the data source. Best Yun Tang From: Jasmin Redžepović Sent: Wednesday, January 26, 2022 23:13 To: user@flink.apache.org Subject: Re: Reading performance - Kafka VS FileSystem Also, what would you recommend? I have both options available: * Kafka - protobuf messages * S3 - here are messages copied from kafka for persistence with Kafka Connect service On 26.01.2022., at 14:43, Jasmin Redžepović mailto:jasmin.redzepo...@superbet.com>> wrote: Hello Flink committers :) Just one short question: How is performance of reading from Kafka source compared to reading from FileSystem source? I would be very grateful if you could provide a short explanation. I saw in documentation that both provide exactly-once semantics for streaming, but this sentence about FileSystem got me thinking about performance: “For any repeated enumeration, the SplitEnumerator filters out previously detected files and only sends new ones to the SourceReader.” - is this filtering slowing down reading if there are more and more files? p.s. I’m new to the Flink Thanks for your help and Best regards, Jasmin This email is confidential and intended solely for the use of the individual or entity to whom it is addressed. If you received this e-mail by mistake, please notify the sender immediately by e-mail and delete this e-mail from your system. Please be informed that if you are not the intended recipient, you should not disseminate, distribute, disclose, copy or use this e-mail in any way, the act of dissemination, distribution, disclosure, copying or taking any action in reliance on the contents of this information being strictly prohibited. This e-mail is sent by a Superbet Group company. Any views expressed by the sender of this email are not necessarily those of Superbet Group. Please note that computer viruses can be transmitted by email. You are advised to check this email and any attachments for the presence of viruses. Superbet Group cannot accept any responsibility for any viruses transmitted by this email and/or any attachments.
Re: Reading performance - Kafka VS FileSystem
Also, what would you recommend? I have both options available: * Kafka - protobuf messages * S3 - here are messages copied from kafka for persistence with Kafka Connect service On 26.01.2022., at 14:43, Jasmin Redžepović mailto:jasmin.redzepo...@superbet.com>> wrote: Hello Flink committers :) Just one short question: How is performance of reading from Kafka source compared to reading from FileSystem source? I would be very grateful if you could provide a short explanation. I saw in documentation that both provide exactly-once semantics for streaming, but this sentence about FileSystem got me thinking about performance: “For any repeated enumeration, the SplitEnumerator filters out previously detected files and only sends new ones to the SourceReader.” - is this filtering slowing down reading if there are more and more files? p.s. I’m new to the Flink Thanks for your help and Best regards, Jasmin This email is confidential and intended solely for the use of the individual or entity to whom it is addressed. If you received this e-mail by mistake, please notify the sender immediately by e-mail and delete this e-mail from your system. Please be informed that if you are not the intended recipient, you should not disseminate, distribute, disclose, copy or use this e-mail in any way, the act of dissemination, distribution, disclosure, copying or taking any action in reliance on the contents of this information being strictly prohibited. This e-mail is sent by a Superbet Group company. Any views expressed by the sender of this email are not necessarily those of Superbet Group. Please note that computer viruses can be transmitted by email. You are advised to check this email and any attachments for the presence of viruses. Superbet Group cannot accept any responsibility for any viruses transmitted by this email and/or any attachments.
Re: create savepoint on bounded source in streaming mode
DataStream API -- Sender:Guowei Ma Sent At:2022 Jan. 26 (Wed.) 21:51 Recipient:Shawn Du Cc:user Subject:Re: create savepoint on bounded source in streaming mode Hi, Shawn Thank you for your sharing. Unfortunately I do not think there is an easy way to achieve this now. Actually we have a customer who has the same requirement but the scenario is a little different. The bounded and unbounded pipeline have some differences but the customer wants reuse some state of the bounded pipeline. Another question is what the api does the pipelined use? DataStream or SQL Best, Guowei On Wed, Jan 26, 2022 at 8:58 PM Shawn Du wrote: right! -- Sender:Guowei Ma Sent At:2022 Jan. 26 (Wed.) 19:50 Recipient:Shawn Du Cc:user Subject:Re: create savepoint on bounded source in streaming mode Hi,Shawn You want to use the correct state(n-1) for day n-1 and the full amount of data for day n to produce the correct state(n) for day n. Then use state(n) to initialize a job to process the data for day n+1. Am I understanding this correctly? Best, Guowei Shawn Du 于2022年1月26日 周三下午7:15写道: Hi Gaowei, think the case: we have one streaming application built by flink, but kinds of reason, the event may be disordered or delayed terribly. we want to replay the data day by day(the data was processed like reordered.). it looks like a batching job but with state. we want to use the same code for replaying. thus we need persist the state for next job. any ideas? Thanks Shawn -- Sender:Guowei Ma Sent At:2022 Jan. 26 (Wed.) 15:39 Recipient:Shawn Du Cc:user Subject:Re: create savepoint on bounded source in streaming mode Hi Shawn Currently Flink can not trigger the sp at the end of the input. An alternative way might be that you need to develop a customized source, which triggers a savepoint when it notices that all the input split has been handled. Or you could see the state process api[1], which might be helpful. Thanks for your sharing but I have another little question: I think you need to process all the historical events to rebuild the correct state. So there might be no gain even if you periodically create a savepoint. So why did you need to "rebuild" the state periodically? Do I miss something? [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/ Best, Guowei On Wed, Jan 26, 2022 at 2:17 PM Shawn Du wrote: our application is stateful. processing live events depends on the state. but for kinds of reason, we need rebuild the state. it will be very costly to replay all data. our historical events data are stored in s3. so we want to create states/savepoints periodically so that we can rebuild the state from a point. we call this as a bootstrap process. any ideas? Thanks. -- Sender:Guowei Ma Sent At:2022 Jan. 26 (Wed.) 14:04 Recipient:Shawn Du Cc:user Subject:Re: create savepoint on bounded source in streaming mode Hi, Shawn I think Flink does not support this mechanism yet. Would you like to share the scenario in which you need this savepoint at the end of the bounded input? Best, Guowei On Wed, Jan 26, 2022 at 1:50 PM Shawn Du wrote: Hi experts, assume I have several files and I want replay these files in order in streaming mode and create a savepoint when files play at the end. it is possible? I wrote a simple test app, and job are finished when source is at the end. I have no chance to creat a savepoint. please help. Thanks Shawn -- Best, Guowei
Re: create savepoint on bounded source in streaming mode
Hi, Shawn Thank you for your sharing. Unfortunately I do not think there is an easy way to achieve this now. Actually we have a customer who has the same requirement but the scenario is a little different. The bounded and unbounded pipeline have some differences but the customer wants reuse some state of the bounded pipeline. Another question is what the api does the pipelined use? DataStream or SQL Best, Guowei On Wed, Jan 26, 2022 at 8:58 PM Shawn Du wrote: >right! > > -- > Sender:Guowei Ma > Sent At:2022 Jan. 26 (Wed.) 19:50 > Recipient:Shawn Du > Cc:user > Subject:Re: create savepoint on bounded source in streaming mode > > Hi,Shawn > > You want to use the correct state(n-1) for day n-1 and the full amount of > data for day n to produce the correct state(n) for day n. > Then use state(n) to initialize a job to process the data for day n+1. > Am I understanding this correctly? > > Best, > Guowei > > Shawn Du 于2022年1月26日 周三下午7:15写道: > Hi Gaowei, > > think the case: > we have one streaming application built by flink, but kinds of > reason, the event may be disordered or delayed terribly. > we want to replay the data day by day(the data was processed like > reordered.). it looks like a batching job but with state. we want to use > the same code for replaying. > thus we need persist the state for next job. any ideas? > > Thanks > Shawn > > > -- > Sender:Guowei Ma > Sent At:2022 Jan. 26 (Wed.) 15:39 > Recipient:Shawn Du > Cc:user > Subject:Re: create savepoint on bounded source in streaming mode > > Hi Shawn > Currently Flink can not trigger the sp at the end of the input. An > alternative way might be that you need to develop a customized source, > which triggers a savepoint when it notices that all the input split has > been handled. > Or you could see the state process api[1], which might be helpful. > > Thanks for your sharing but I have another little question: > I think you need to process all the historical events to rebuild the > correct state. So there might be no gain even if you periodically create a > savepoint. So why did you need to "rebuild" the state periodically? Do I > miss something? > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/ > > Best, > Guowei > > > On Wed, Jan 26, 2022 at 2:17 PM Shawn Du wrote: > >our application is stateful. processing live events depends on the > state. but for kinds of reason, we need rebuild the state. it will be very > costly to replay all data. >our historical events data are stored in s3. so we want to create > states/savepoints periodically so that we can rebuild the state from a > point. we call this as a bootstrap process. >any ideas? > >Thanks. > -- > Sender:Guowei Ma > Sent At:2022 Jan. 26 (Wed.) 14:04 > Recipient:Shawn Du > Cc:user > Subject:Re: create savepoint on bounded source in streaming mode > > Hi, Shawn > I think Flink does not support this mechanism yet. > Would you like to share the scenario in which you need this savepoint at > the end of the bounded input? > Best, > Guowei > > > On Wed, Jan 26, 2022 at 1:50 PM Shawn Du wrote: > Hi experts, > > assume I have several files and I want replay these files in order in > streaming mode and create a savepoint when files play at the end. it is > possible? > I wrote a simple test app, and job are finished when source is at the end. > I have no chance to creat a savepoint. please help. > > Thanks > Shawn > > > -- > Best, > Guowei > > >
Reading performance - Kafka VS FileSystem
Hello Flink committers :) Just one short question: How is performance of reading from Kafka source compared to reading from FileSystem source? I would be very grateful if you could provide a short explanation. I saw in documentation that both provide exactly-once semantics for streaming, but this sentence about FileSystem got me thinking about performance: “For any repeated enumeration, the SplitEnumerator filters out previously detected files and only sends new ones to the SourceReader.” - is this filtering slowing down reading if there are more and more files? p.s. I’m new to the Flink Thanks for your help and Best regards, Jasmin This email is confidential and intended solely for the use of the individual or entity to whom it is addressed. If you received this e-mail by mistake, please notify the sender immediately by e-mail and delete this e-mail from your system. Please be informed that if you are not the intended recipient, you should not disseminate, distribute, disclose, copy or use this e-mail in any way, the act of dissemination, distribution, disclosure, copying or taking any action in reliance on the contents of this information being strictly prohibited. This e-mail is sent by a Superbet Group company. Any views expressed by the sender of this email are not necessarily those of Superbet Group. Please note that computer viruses can be transmitted by email. You are advised to check this email and any attachments for the presence of viruses. Superbet Group cannot accept any responsibility for any viruses transmitted by this email and/or any attachments.
Re: create savepoint on bounded source in streaming mode
right! -- Sender:Guowei Ma Sent At:2022 Jan. 26 (Wed.) 19:50 Recipient:Shawn Du Cc:user Subject:Re: create savepoint on bounded source in streaming mode Hi,Shawn You want to use the correct state(n-1) for day n-1 and the full amount of data for day n to produce the correct state(n) for day n. Then use state(n) to initialize a job to process the data for day n+1. Am I understanding this correctly? Best, Guowei Shawn Du 于2022年1月26日 周三下午7:15写道: Hi Gaowei, think the case: we have one streaming application built by flink, but kinds of reason, the event may be disordered or delayed terribly. we want to replay the data day by day(the data was processed like reordered.). it looks like a batching job but with state. we want to use the same code for replaying. thus we need persist the state for next job. any ideas? Thanks Shawn -- Sender:Guowei Ma Sent At:2022 Jan. 26 (Wed.) 15:39 Recipient:Shawn Du Cc:user Subject:Re: create savepoint on bounded source in streaming mode Hi Shawn Currently Flink can not trigger the sp at the end of the input. An alternative way might be that you need to develop a customized source, which triggers a savepoint when it notices that all the input split has been handled. Or you could see the state process api[1], which might be helpful. Thanks for your sharing but I have another little question: I think you need to process all the historical events to rebuild the correct state. So there might be no gain even if you periodically create a savepoint. So why did you need to "rebuild" the state periodically? Do I miss something? [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/ Best, Guowei On Wed, Jan 26, 2022 at 2:17 PM Shawn Du wrote: our application is stateful. processing live events depends on the state. but for kinds of reason, we need rebuild the state. it will be very costly to replay all data. our historical events data are stored in s3. so we want to create states/savepoints periodically so that we can rebuild the state from a point. we call this as a bootstrap process. any ideas? Thanks. -- Sender:Guowei Ma Sent At:2022 Jan. 26 (Wed.) 14:04 Recipient:Shawn Du Cc:user Subject:Re: create savepoint on bounded source in streaming mode Hi, Shawn I think Flink does not support this mechanism yet. Would you like to share the scenario in which you need this savepoint at the end of the bounded input? Best, Guowei On Wed, Jan 26, 2022 at 1:50 PM Shawn Du wrote: Hi experts, assume I have several files and I want replay these files in order in streaming mode and create a savepoint when files play at the end. it is possible? I wrote a simple test app, and job are finished when source is at the end. I have no chance to creat a savepoint. please help. Thanks Shawn -- Best, Guowei
Re: create savepoint on bounded source in streaming mode
Hi,Shawn You want to use the correct state(n-1) for day n-1 and the full amount of data for day n to produce the correct state(n) for day n. Then use state(n) to initialize a job to process the data for day n+1. Am I understanding this correctly? Best, Guowei Shawn Du 于2022年1月26日 周三下午7:15写道: > Hi Gaowei, > > think the case: > we have one streaming application built by flink, but kinds of > reason, the event may be disordered or delayed terribly. > we want to replay the data day by day(the data was processed like > reordered.). it looks like a batching job but with state. we want to use > the same code for replaying. > thus we need persist the state for next job. any ideas? > > Thanks > Shawn > > > -- > Sender:Guowei Ma > Sent At:2022 Jan. 26 (Wed.) 15:39 > > Recipient:Shawn Du > Cc:user > Subject:Re: create savepoint on bounded source in streaming mode > > Hi Shawn > Currently Flink can not trigger the sp at the end of the input. An > alternative way might be that you need to develop a customized source, > which triggers a savepoint when it notices that all the input split has > been handled. > Or you could see the state process api[1], which might be helpful. > > Thanks for your sharing but I have another little question: > I think you need to process all the historical events to rebuild the > correct state. So there might be no gain even if you periodically create a > savepoint. So why did you need to "rebuild" the state periodically? Do I > miss something? > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/ > > Best, > Guowei > > > On Wed, Jan 26, 2022 at 2:17 PM Shawn Du wrote: > >our application is stateful. processing live events depends on the > state. but for kinds of reason, we need rebuild the state. it will be very > costly to replay all data. >our historical events data are stored in s3. so we want to create > states/savepoints periodically so that we can rebuild the state from a > point. we call this as a bootstrap process. >any ideas? > >Thanks. > -- > Sender:Guowei Ma > Sent At:2022 Jan. 26 (Wed.) 14:04 > Recipient:Shawn Du > Cc:user > Subject:Re: create savepoint on bounded source in streaming mode > > Hi, Shawn > I think Flink does not support this mechanism yet. > Would you like to share the scenario in which you need this savepoint at > the end of the bounded input? > Best, > Guowei > > > On Wed, Jan 26, 2022 at 1:50 PM Shawn Du wrote: > Hi experts, > > assume I have several files and I want replay these files in order in > streaming mode and create a savepoint when files play at the end. it is > possible? > I wrote a simple test app, and job are finished when source is at the end. > I have no chance to creat a savepoint. please help. > > Thanks > Shawn > > > -- Best, Guowei
Resolving a CatalogTable
Hi everyone, I'm trying to migrate from the old set of CatalogTable related APIs (CatalogTableImpl, TableSchema, DescriptorProperties) to the new ones (CatalogBaseTable, Schema and ResolvedSchema, CatalogPropertiesUtil), in a custom catalog. The catalog stores table definitions, and the current logic involves persisting the schema from a CatalogBaseTable to a database. When we get a table, its definition is read from the database and the CatalogTable is built up and returned. For this, we currently serialize the schema like this: descriptorProperties.putTableSchema(Schema.SCHEMA, catalogBaseTable.getSchema()); The new API seems to intentionally only allow the serialization of the Resolved version of objects (e.g. ResolvedCatalogTable, ResolvedSchema). 1. Could you please clarify why this limitation was put into place? It seems to me that it would be sufficient to resolve the CatalogTables once we are actually trying to pass the table to the DynamicTableFactory. 2. What additional information is gained during the resolution of a CatalogTable, and where does that information come from? Are there some references to things in other catalogs? 3. Is it possible to "manually" resolve a CatalogTable? (invoke something like what the internal DefaultSchemaResolver does). What context is required? Thanks, Balazs
Re: create savepoint on bounded source in streaming mode
Hi Gaowei, think the case: we have one streaming application built by flink, but kinds of reason, the event may be disordered or delayed terribly. we want to replay the data day by day(the data was processed like reordered.). it looks like a batching job but with state. we want to use the same code for replaying. thus we need persist the state for next job. any ideas? Thanks Shawn -- Sender:Guowei Ma Sent At:2022 Jan. 26 (Wed.) 15:39 Recipient:Shawn Du Cc:user Subject:Re: create savepoint on bounded source in streaming mode Hi Shawn Currently Flink can not trigger the sp at the end of the input. An alternative way might be that you need to develop a customized source, which triggers a savepoint when it notices that all the input split has been handled. Or you could see the state process api[1], which might be helpful. Thanks for your sharing but I have another little question: I think you need to process all the historical events to rebuild the correct state. So there might be no gain even if you periodically create a savepoint. So why did you need to "rebuild" the state periodically? Do I miss something? [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/ Best, Guowei On Wed, Jan 26, 2022 at 2:17 PM Shawn Du wrote: our application is stateful. processing live events depends on the state. but for kinds of reason, we need rebuild the state. it will be very costly to replay all data. our historical events data are stored in s3. so we want to create states/savepoints periodically so that we can rebuild the state from a point. we call this as a bootstrap process. any ideas? Thanks. -- Sender:Guowei Ma Sent At:2022 Jan. 26 (Wed.) 14:04 Recipient:Shawn Du Cc:user Subject:Re: create savepoint on bounded source in streaming mode Hi, Shawn I think Flink does not support this mechanism yet. Would you like to share the scenario in which you need this savepoint at the end of the bounded input? Best, Guowei On Wed, Jan 26, 2022 at 1:50 PM Shawn Du wrote: Hi experts, assume I have several files and I want replay these files in order in streaming mode and create a savepoint when files play at the end. it is possible? I wrote a simple test app, and job are finished when source is at the end. I have no chance to creat a savepoint. please help. Thanks Shawn
Re: How to run in IDE?
We will need more of the logs contents to help you (preferably the whole thing. On 25/01/2022 23:55, John Smith wrote: I'm using: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); But no go. On Mon, 24 Jan 2022 at 16:35, John Smith wrote: Hi using Flink 1.14.3 with gradle. I explicitly added the flink client dependency and the job starts but it quits with... In Flink 1.10 the job worked as is. How do I set the number of slots and is there any other settings for the IDE? 16:29:50,633 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Received resource requirements from job 3a3e9c46da413071392bce161c39270f: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 16:29:50,633 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Print to Std. Out (14/16) (d7c4fbf5f23f3118e54998f2b35338c1) switched from CANCELING to CANCELED.
Flink-ML: Sink model data in online training
Hi, I want sink the model data (coefficient from the logsitic regression model in my case) from the flink.ml.api.Model to print or file. I figure out the way to sink it in the batch training mode but face the following exception when the Estimator takes an UNBOUNDED datastream. ``` Caused by: java.lang.IllegalStateException: There can be only a single consumer in a FeedbackChannel. at org.apache.flink.statefun.flink.core.feedback.FeedbackChannel.registerConsumer(FeedbackChannel.java:79) ``` This will happend if I dump it through the Table API like this: ``` final TableDescriptor sinkDescriptor = TableDescriptor .forConnector("print") .schema(Schema .newBuilder() .column("coefficient", DataTypes.of(new DenseVectorTypeInfo())) .build() ).build(); tEnv.createTemporaryTable("ModelSink", sinkDescriptor); model.getModelData()[0].executeInsert("ModelSink"); ``` Looking for an example that can sink the model data in online training mode. With many thanks, thekingofcity
flinkcdc 2.1????
??flinkcdc 2.1.1
Re: Failure Restart Strategy leads to error
Hi Siddhesh, The root cause is that the configuration of group.id is missing for the Flink program. The configuration of restart strategy has no relationship with this. I think you should pay your attention to kafka related configurations. Best Yun Tang From: Siddhesh Kalgaonkar Sent: Wednesday, January 26, 2022 3:17 To: user Subject: Failure Restart Strategy leads to error I have Flink Kafka Consumer in place which works fine until I add the below lines: env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per unit Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate Time.of(10, TimeUnit.SECONDS) // delay )) It gives me the below error stack trace: DEBUG [flink-akka.actor.default-dispatcher-14] (JobMaster.java:1119) - Close ResourceManager connection 05d80aa9f3aca06faf7be80bbc8a0642. org.apache.flink.util.FlinkException: Stopping JobMaster for job Flink Kafka Example(b425ae91bfb0e81980b878b3e4392137). at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:400) at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) DEBUG [flink-akka.actor.default-dispatcher-12] (DefaultJobLeaderIdService.java:148) - Remove job b425ae91bfb0e81980b878b3e4392137 from job leader id monitoring. INFO [flink-akka.actor.default-dispatcher-12] (ResourceManager.java:1047) - Disconnect job manager a95c280817468866d08c3230ecd0462f@akka://flink/user/rpc/jobmanager_3 for job b425ae91bfb0e81980b878b3e4392137 from the resource manager. DEBUG [flink-akka.actor.default-dispatcher-12] (DefaultResourceTracker.java:80) - Initiating tracking of resources for job b425ae91bfb0e81980b878b3e4392137. DEBUG [flink-akka.actor.default-dispatcher-12] (DefaultResourceTracker.java:60) - Stopping tracking of resources for job b425ae91bfb0e81980b878b3e4392137. DEBUG [flink-akka.actor.default-dispatcher-14] (AkkaRpcActor.java:131) - The RpcEndpoint jobmanager_3 terminated successfully. INFO [flink-akka.actor.default-dispatcher-8] (DefaultJobLeaderService.java:136) - Stop job leader service. INFO [flink-akka.actor.default-dispatcher-8] (TaskExecutorLocalStateStoresManager.java:231) - Shutting down TaskExecutorLocalStateStoresManager. DEBUG [flink-akka.actor.default-dispatcher-8] (IOManagerAsync.java:121) - Shutting down I/O manager. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at
Re: create savepoint on bounded source in streaming mode
cool! HybridSource seems much close to my requirements. Thanks Dawid. I will have a try. Shawn -- Sender:Dawid Wysakowicz Sent At:2022 Jan. 26 (Wed.) 15:49 Recipient:user Subject:Re: create savepoint on bounded source in streaming mode Hi Shawn, You could also take a look at the hybrid source[1] Best, Dawid [1]https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/ On 26/01/2022 08:39, Guowei Ma wrote: Hi Shawn Currently Flink can not trigger the sp at the end of the input. An alternative way might be that you need to develop a customized source, which triggers a savepoint when it notices that all the input split has been handled. Or you could see the state process api[1], which might be helpful. Thanks for your sharing but I have another little question: I think you need to process all the historical events to rebuild the correct state. So there might be no gain even if you periodically create a savepoint. So why did you need to "rebuild" the state periodically? Do I miss something? [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/ Best, Guowei On Wed, Jan 26, 2022 at 2:17 PM Shawn Du wrote: our application is stateful. processing live events depends on the state. but for kinds of reason, we need rebuild the state. it will be very costly to replay all data. our historical events data are stored in s3. so we want to create states/savepoints periodically so that we can rebuild the state from a point. we call this as a bootstrap process. any ideas? Thanks. -- Sender:Guowei Ma Sent At:2022 Jan. 26 (Wed.) 14:04 Recipient:Shawn Du Cc:user Subject:Re: create savepoint on bounded source in streaming mode Hi, Shawn I think Flink does not support this mechanism yet. Would you like to share the scenario in which you need this savepoint at the end of the bounded input? Best, Guowei On Wed, Jan 26, 2022 at 1:50 PM Shawn Du wrote: Hi experts, assume I have several files and I want replay these files in order in streaming mode and create a savepoint when files play at the end. it is possible? I wrote a simple test app, and job are finished when source is at the end. I have no chance to creat a savepoint. please help. Thanks Shawn