Re: Flink Event specific window

2021-04-25 Thread Arvid Heise
1. It always depends on the data volume per user. A million user is not
much if you compare it to the biggest Flink installations (Netflix,
Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd
recommend to use rocksDB state backend. [1]

2. Are you referring to statefun? I'd say that for your use case, Flink is
a better fit. Statefun is more suitable when each actor (=user in your
case) acts differently depending on the data like in a state machine. In
your case, your users should be processed in the same way: Even if the
windows are independently opened and closed, every user has only at most
one window open at a given event time. You probably also aggregate all user
states more or less in the same way.

Or did you refer to processing functions with state? That's certainly
possible to implement but it won't be much faster unless you can exploit
some specific properties of your application. An example is written in [2].
I'd recommend to first use regular, built-in windows and only switch to
custom code if the performance is insufficient. Custom implementations may
be faster now, but come with a higher maintenance cost and the built-in
windows may be better optimized in future.

Lastly if your query is of relational nature, I'd recommend to have a look
at Table API/SQL [3]. Unless you really invest a lot of time, you won't be
able to write more efficient code than what Table API is generating.

[1] https://flink.apache.org/2021/01/18/rocksdb.html
[2] https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#session-session-windows

On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra  wrote:

>  1. What if there are a very high number of users, like a million
> customers won't the service crash? Is it advisable to hold the data in
> memory.
>
> 2. What if state-functions are used to calculate the value ? How will this
> approach differ from the one proposed below.
>
> Regards,
> Swagat
>
> On Wed, Apr 21, 2021, 11:25 PM Arvid Heise  wrote:
>
>> Hi Sunitha,
>>
>> the approach you are describing sounds like you want to use a session
>> window. [1] If you only want to count them if they happen at the same hour
>> then, you want to use a tumbling window.
>>
>> Your datastream approach looks solid.
>>
>> For SQL, there is also a session (and tumbling) window [2]. You can see
>> examples at the bottom of the section.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
>>
>> On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com <
>> s_penakalap...@yahoo.com> wrote:
>>
>>> Hi All,
>>>
>>> I have one requirement where I need to calculate total amount of
>>> transactions done by each each user in last 1 hour.
>>> Say Customer1 has done 2 transactions one at 11:00am and other one at
>>> 11:20 am.
>>> Customer2 has done 1 transaction one at 10:00 am
>>> Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45
>>> am.
>>>
>>> when ever customer does a transaction then we receive an event in source
>>> topic, we consume the data and need to calculate the total amount spent by
>>> Customer in last 1 hour.
>>>
>>> if I have received customer1 new transaction event at 11:30 am then I
>>> need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e
>>> 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
>>> Now say I receive Customer2 new transaction event at 11:30 am then for
>>> this customer I need to consider only one event 11:30 (ignoring the event
>>> at  10:00 am  as it does not fall in last 1 hr)
>>> Customer3 new transaction is done at 12:40 pm then for this Customer I
>>> need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall
>>> under last 1 hr.
>>>
>>> Approach I am planning to try:
>>> Every event has the transaction time which I am using as event time to
>>> assign WatermarkStrategy
>>> KeyBy - customerId
>>> SlidingEventTimeWindows of 1 hr
>>> then process all elements using ProcessWindowFunction
>>>
>>>
>>> Kindly suggest the approach I need to follow to achieve the above
>>> scenario using Flink Java /Sql. I am using Flink 1.12.0.
>>>
>>> Regards,
>>> Sunitha
>>>
>>


Checkpoint error - "The job has failed"

2021-04-25 Thread Dan Hill
My Flink job failed to checkpoint with a "The job has failed" error.  The
logs contained no other recent errors.  I keep hitting the error even if I
cancel the jobs and restart them.  When I restarted my jobmanager and
taskmanager, the error went away.

What error am I hitting?  It looks like there is bad state that lives
outside the scope of a job.

How often do people restart their jobmanagers and taskmanager to deal with
errors like this?


Re: Watermarks in Event Time Temporal Join

2021-04-25 Thread Shengkai Fang
Hi, maverick.

The watermark is used to determine the message is late or early. If we only
use the watermark on versioned table side, we have no means to determine
whether the event in the main stream is ready to emit.

Best,
Shengkai

maverick  于2021年4月26日周一 上午2:31写道:

> Hi,
> I'm curious why Event Time Temporal Join needs watermarks from both sides
> to
> perform join.
>
> Shouldn't watermark on versioned table side be enough to perform join ?
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Flink消费Kafka数据积压

2021-04-25 Thread Back moon
hi,all
这边有个job是利用Flink消费Kafka数据,然后对指标聚合写入redis,job最近频繁重启,相关异常日志如下:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout.
Please make sure that the cluster has enough resources.
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:452)
[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$5(DefaultScheduler.java:433)
[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
~[na:1.8.0_121]
at 
org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:168)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
~[na:1.8.0_121]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:726)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:432)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
~[na:1.8.0_121]
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
~[na:1.8.0_121]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
~[na:1.8.0_121]
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:999)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.10.1.jar:1.10.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[flink_hotel_pyramidadsviewrtland_v3-13357881.jar:na]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

Re: Receiving context information through JobListener interface

2021-04-25 Thread Yangze Guo
It seems that the JobListener interface could not expose such
information. Maybe you can set the RuleId as the jobName(or the suffix
of the jobName) of the application, then you can get the mappings of
jobId to jobName(RuleId) throw /jobs/overview.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/#jobs-overview

Best,
Yangze Guo

On Sun, Apr 25, 2021 at 4:17 PM Barak Ben Nathan
 wrote:
>
>
>
> Hi all,
>
>
>
> I am building an application that launches Flink Jobs and monitors them.
>
>
>
> I want to use the JobListener interface to output job evemts to a Kafka Topic.
>
>
>
> The problem:
>
> In the application we have RuleId, i.e.  business logic identifier for the 
> job,  and there’s JobId which is  the internal identifier generated by Flink.
>
> I need the events emitted to Kafka to be partitioned by *RuleId*.
>
>
>
> Is there a way to pass this kind of information to Flink and get it through 
> the JobListener interface?
>
>
>
> Thanks,
>
> Barak


flinkKafkaConsumer的offset提交的问题

2021-04-25 Thread lp
请教一下,flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
和kafka自己的"enable.auto.commit"=true【默认就是true,
interval=5s】,在checkpoint的时候有啥区别,假如我已经enable了chk?

看注释flinkKafkaConsumer.setCommitOffsetsOnCheckpoints()方法的注释如下:

/**
 * Specifies whether or not the consumer should commit offsets back to
Kafka on checkpoints.
 *
 * This setting will only have effect if checkpointing is enabled for
the job. If
 * checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8)
/ "enable.auto.commit"
 * (for 0.9+) property settings will be used.
 *
 * @return The consumer object, to allow function chaining.
 */

我的理解是:意思是如果enable了checkpoint,然后设置flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
(貌似默认就是true),就会采用checkpoint的interval去向kafka提交offset? 
,而不采用auto.commit.enable的配置?这样理解对么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Too man y checkpoint folders kept for externalized retention.

2021-04-25 Thread John Smith
No. But I decided to disable it finally

On Sun., Apr. 25, 2021, 5:14 a.m. Yun Gao,  wrote:

> Hi John,
>
> Logically the maximum retained checkpoints are configured
> by state.checkpoints.num-retained [1]. Have you configured
> this option?
>
>
> Best,
> Yun
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#state-checkpoints-num-retained
>
>
>
> --
> Sender:John Smith
> Date:2021/04/24 01:41:41
> Recipient:user
> Theme:Too man y checkpoint folders kept for externalized retention.
>
> Hi running 1.10.0.
>
> Just curious is this specific to externalized retention or checkpointing
> in general.
>
> I see my checkpoint folder counting thousands of chk-x folders.
>
> If using default checkpoint or NONE externalized checkpointing does the
> count of chk- folders grow indefinitely until the job is killed or it
> retains up to certain amount?
>
> Thanks
>
>


Re: Writing to Avro from pyflink

2021-04-25 Thread Dian Fu
Hi Eddie,

I have tried your program with the following changes and it could execute 
successfully:
- Replace 
`rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”`
 with 
rf`"file:///Users/dianfu/code/src/apache/flink/flink-sql-avro-1.12.3.jar”` 

- Use flink-sql-avro-1.12.3.jar [1] instead of flink-sql-avro-1.12.2.jar as I 
encountered issue FLINK-21012 [2] which has been addressed in 1.12.3 

For your problem, I suspect if 
`file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar` 
 really exists. 
Could you double check that?

[1] 
https://repository.apache.org/content/repositories/orgapacheflink-1419/org/apache/flink/flink-sql-avro/1.12.3/flink-sql-avro-1.12.3.jar
[2] https://issues.apache.org/jira/browse/FLINK-21012 


Regards,
Dian

> 2021年4月25日 下午11:56,Edward Yang  写道:
> 
> Hi Dian,
> 
> I tried your suggestion but had the same error message unfortunately. I also 
> tried file:/ and file:// with the same error, not sure what's going on, I 
> assume writing to avro works fine in java and scala? 
> 
> Eddie
> 
> On Sat, Apr 24, 2021 at 10:03 PM Dian Fu  > wrote:
> I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar <>. 
> Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try again? 
> 
> Regards,
> Dian
> 
>> 2021年4月24日 上午8:29,Edward Yang > > 写道:
>> 
>> I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, 
>> I've tested my code with an iterator writing to csv and everything works as 
>> expected. Reading through the flink documentation I see that I should add 
>> jar dependencies to work with avro. I downloaded three jar files that I 
>> believe are required for avro like so:
>> 
>> table_env\
>> .get_config()\
>> .get_configuration()\
>> .set_string(
>> "pipeline.jars", 
>> 
>> rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar
>>  <>"
>> )
>> 
>> I suspect I'm not loading the jar files correctly, but it's unclear what I'm 
>> supposed to do as I'm not familiar with java and when I switch the sink 
>> format to avro I get some unexpected errors: 
>> Py4JJavaError: An error occurred while calling o746.executeInsert.
>> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>>  at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>>  at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
>>  at 
>> 

kafka consumers partition count and parallelism

2021-04-25 Thread Prashant Deva
Can a Kafka Consumer Source have more tasks run in parallel than the number
of partitions for the topic it is the source of? Or is the max parallelism
of the source constrained by max partitions of the topic?


Re: Flink Event specific window

2021-04-25 Thread Swagat Mishra
 1. What if there are a very high number of users, like a million customers
won't the service crash? Is it advisable to hold the data in memory.

2. What if state-functions are used to calculate the value ? How will this
approach differ from the one proposed below.

Regards,
Swagat

On Wed, Apr 21, 2021, 11:25 PM Arvid Heise  wrote:

> Hi Sunitha,
>
> the approach you are describing sounds like you want to use a session
> window. [1] If you only want to count them if they happen at the same hour
> then, you want to use a tumbling window.
>
> Your datastream approach looks solid.
>
> For SQL, there is also a session (and tumbling) window [2]. You can see
> examples at the bottom of the section.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
>
> On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com <
> s_penakalap...@yahoo.com> wrote:
>
>> Hi All,
>>
>> I have one requirement where I need to calculate total amount of
>> transactions done by each each user in last 1 hour.
>> Say Customer1 has done 2 transactions one at 11:00am and other one at
>> 11:20 am.
>> Customer2 has done 1 transaction one at 10:00 am
>> Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.
>>
>> when ever customer does a transaction then we receive an event in source
>> topic, we consume the data and need to calculate the total amount spent by
>> Customer in last 1 hour.
>>
>> if I have received customer1 new transaction event at 11:30 am then I
>> need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e
>> 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
>> Now say I receive Customer2 new transaction event at 11:30 am then for
>> this customer I need to consider only one event 11:30 (ignoring the event
>> at  10:00 am  as it does not fall in last 1 hr)
>> Customer3 new transaction is done at 12:40 pm then for this Customer I
>> need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall
>> under last 1 hr.
>>
>> Approach I am planning to try:
>> Every event has the transaction time which I am using as event time to
>> assign WatermarkStrategy
>> KeyBy - customerId
>> SlidingEventTimeWindows of 1 hr
>> then process all elements using ProcessWindowFunction
>>
>>
>> Kindly suggest the approach I need to follow to achieve the above
>> scenario using Flink Java /Sql. I am using Flink 1.12.0.
>>
>> Regards,
>> Sunitha
>>
>


Re: Application cluster - Job execution and cluster creation timeouts

2021-04-25 Thread Tamir Sagi
Hey Yang, Community

As been discussed few weeks ago, I'm working on Application Cluster - Native 
K8s approach, running Flink 1.12.2.
We deploy application clusters programmatically which works well.
In addition, we leverage Kubernetes client(Fabric8io) to watch the 
deployment/pods status and get an indication whether the k8s cluster is up and 
running.

Job Details
We read file from S3 using hadoop-s3-plugin 
(https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins)
Process the data and write it back to S3.

We fetch the job list using ClusterClient.( cluent.listJobs() )
No savepoints/backend state are configured.

I would like to raise several questions regarding some scenarios I encountered 
and would like to get your feedback.
These scenarios showed Flink Application cluster(native k8s) behavior in case 
of failures.


Scenario 1: Exception prior env.execute() gets called.
I deploy application cluster and an exception is thrown prior env.execute() 
gets called.

Result: Received exception, nothing gets cleaned - Job Manager pod is still 
running despite no jobs are running.

Question: How should we get the "Real" Cluster status, Job manager pod is 
running but the execution has never occurred. (Memory leak ? )

Scenario 2 : Application state Error and no running jobs.
env.execute gets called and an exception is thrown prior the Job starts.
I did not provide AWS credential and exception 
com.amazonaws.AmazonClientException was thrown.
which led to "Caused by: org.apache.flink.runtime.client.JobExecutionException" 
error.

the Application state was failed but the list jobs was empty(they never started)
according to the 
document(https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/client/program/ClusterClient.html#listJobs--)
Lists the currently running and finished jobs on the cluster.
I still see debug logs where Flink is aware of that exception and clean all the 
resources.

[2021-04-25T13:59:59,569][Info] {} [o.a.f.r.e.ClusterEntrypoint]: Shutting 
KubernetesApplicationClusterEntrypoint down with application status FAILED. 
Diagnostics null.
[2021-04-25T13:59:59,569][Info] {} [o.a.f.r.j.MiniDispatcherRestEndpoint]: 
Shutting down rest endpoint.
[2021-04-25T13:59:59,767][Info] {} [o.a.f.r.r.a.ActiveResourceManager]: Shut 
down cluster because application is in FAILED, diagnostics null.
[2021-04-25T13:59:59,768][Info] {} [o.a.f.k.KubernetesResourceManagerDriver]: 
Deregistering Flink Kubernetes cluster, clusterId: test-flink-app-9645, 
diagnostics:

Result: cluster gets destroyed, listJobs is empty until client gets 
"UnknownHost" Exception. (Cluster no longer exists)
Question: How can we get the application state outside the cluster? or catch 
JobExecutionException ?


Scenario 3: Job starts and throws an exception, Job Status remains in progress
Once the job is executed its status changed to In Progress,   list jobs are 
retrieved(within few seconds) and for each job we query job status via 
"clusterClient.requestJobResult(jobId)", however once the job failed the result 
never changes to "Failed" but the ComplteableFuture get an exception due to max 
number of retries.

Code snippet

try {
CompletableFuture jobResultFuture = 
client.requestJobResult(flinkJobId);
jobResultFuture.thenAccept(jobResult -> handleJobResult(jobResult))
.exceptionally(throwable -> {
handleJobResultWithException(flinkJobId, 
Optional.of(throwable));
return null;
});
} catch (Exception e) {
handleGetJobResultWithException(flinkJobId, Optional.of(e));
}

Stacktrace

Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
 ~[flink-runtime_2.12-1.12.1.jar!/:1.12.1]
... 33 more
Caused by: java.util.concurrent.CompletionException: 
java.net.UnknownHostException: test-flink-app-24569-rest.testing
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) 
~[?:?]
... 31 more
Caused by: java.net.UnknownHostException: test-flink-app-24569-rest.testing
at java.net.InetAddress$CachedAddresses.get(Unknown Source) ~[?:?]


Result: the resources get cleaned, then the future can no longer get the 
cluster status. we always end up in the exceptionally clause.
Question: Why the job result is not changed to failed? what am I missing?


Highly appreciate your help.

Tamir.






[https://my-email-signature.link/signature.gif?u=1088647=149335852=7e311f531897d939c60e716a2c02f6006f0b61fa9f7067ebaecda04a21c95656]

From: 

Watermarks in Event Time Temporal Join

2021-04-25 Thread maverick
Hi,
I'm curious why Event Time Temporal Join needs watermarks from both sides to
perform join.

Shouldn't watermark on versioned table side be enough to perform join ?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Dynamic Table Options 被优化器去掉了

2021-04-25 Thread macia kk
Hi

  我有在使用 temporal Joini 的时候有设置 如果读取分区的相关的 dynamic
option,但是最后是没有生效的,我看全部使用的默认参数,打印出来了执行计划,逻辑执行计划是有的,优化之后没有了
  如下,我设置的是加载最新分区,24小时加载一次,我看最后运行的日志是加载的全部分区,1小时有一次加载,这都是默认的参数,所以怀疑是 dyanmic
option 没有生效。


== Abstract Syntax Tree ==
+- LogicalSnapshot(period=[$cor0.proctime])
   +- LogicalTableScan(table=[[ds, my_db, store_da_table,
source: [HiveTableSource(store_id, store_name, merchant_id, tag_id,
brand_id, tob_user_id, is_use_wallet, is_use_merchant_app, longitude,
latitude, state, city, district, address, postal_code, register_phone,
email, email_source, register_time, logo, banner, partner_type,
commission_rate, tax_rate, service_fee, min_spend, delivery_distance,
preparation_time, contact_phone, store_status, closed_start_time,
closed_end_time, effective_closed_end_time, auto_confirmed,
auto_confirmed_enabled, create_time, update_time, rating_total,
rating_score, opening_status, surcharge_intervals, service_charge_fee_rate,
driver_modify_order_enabled, delivery_distance_mode, business_info_added,
mtime, dt, grass_region) TablePath: my_db.store_da_table, PartitionPruned:
false, PartitionNums: null], dynamic options:
{streaming-source.enable=true, streaming-source.monitor-interval=24 h,
streaming-source.partition.include=latest}]])

== Optimized Logical Plan ==
Calc(select=[_UTF-16LE'v4' AS version, _UTF-16LE'ID' AS country, city, id,
event_time, operation, platform, payment_method, gmv, 0.0:DECIMAL(2, 1) AS
gmv_usd], where=[NOT(LIKE(UPPER(store_name), _UTF-16LE'%[TEST]%'))])
+- LookupJoin(table=[ds.my_db.store_da_table],
joinType=[LeftOuterJoin], async=[false], lookup=[store_id=store_id],
select=[city, id, event_time, operation, platform, payment_method, gmv,
store_id, store_id, store_name])
   +- Union(all=[true], union=[city, id, event_time, operation, platform,
payment_method, gmv, store_id])
  :- Calc(select=[delivery_city AS city, id, /(CAST(create_time), 1000)
AS event_time, CASE(OR(=(order_status, 440), =(order_status, 800)),
_UTF-16LE'NET':VARCHAR(5) CHARACTER SET "UTF-16LE",
_UTF-16LE'GROSS':VARCHAR(5) CHARACTER SET "UTF-16LE") AS operation,
_UTF-16LE'' AS platform, payment_method, /(CAST(total_amount), 10)
AS gmv, CAST(store_id) AS store_id])
  :  +- DataStreamScan(table=[[ds, keystats,
main_db__transaction_tab]], fields=[id, delivery_city, store_id,
create_time, payment_time, order_status, payment_method, total_amount,
proctime], reuse_id=[1])
  +- Calc(select=[delivery_city AS city, id, /(CAST(payment_time),
1000) AS event_time, _UTF-16LE'NET':VARCHAR(5) CHARACTER SET "UTF-16LE" AS
operation, _UTF-16LE'AIRPAY' AS platform, payment_method,
/(CAST(total_amount), 10) AS gmv, CAST(store_id) AS store_id],
where=[OR(=(order_status, 440), =(order_status, 800))])
 +- Reused(reference_id=[1])


Re: Writing to Avro from pyflink

2021-04-25 Thread Edward Yang
Hi Dian,

I tried your suggestion but had the same error message unfortunately. I
also tried file:/ and file:// with the same error, not sure what's going
on, I assume writing to avro works fine in java and scala?

Eddie

On Sat, Apr 24, 2021 at 10:03 PM Dian Fu  wrote:

> I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar
> . Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try
> again?
>
> Regards,
> Dian
>
> 2021年4月24日 上午8:29,Edward Yang  写道:
>
> I've been trying to write to the avro format with pyflink 1.12.2 on
> ubuntu, I've tested my code with an iterator writing to csv and everything
> works as expected. Reading through the flink documentation I see that I
> should add jar dependencies to work with avro. I downloaded three jar files
> that I believe are required for avro like so:
>
> table_env\
> .get_config()\
> .get_configuration()\
> .set_string(
> "pipeline.jars",
> rf"
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar
> "
> )
>
> I suspect I'm not loading the jar files correctly, but it's unclear what
> I'm supposed to do as I'm not familiar with java and when I switch the sink
> format to avro I get some unexpected errors:
>
> Py4JJavaError: An error occurred while calling o746.executeInsert.
> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>   at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>   at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>   at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
>   at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> 

Re: Approaches for external state for Flink

2021-04-25 Thread David Anderson
>
> Now, I'm just worried about the state size. State size will grow forever.
> There is no TTL.


The potential for unbounded state is certainly a problem, and it's going to
be a problem no matter how you implement the deduplication. Standard
techniques for mitigating this include (1) limiting the timeframe for
deduplication, and/or (2) using bloom filters to reduce the storage needed
in exchange for some (bounded percentage of) false positives.  But since
you must store data from stream1 to use later for enrichment, I think bloom
filters are only potentially relevant for deduplicating stream2.

Do you have any temporal constraints on how the enrichment of stream2 is
done? For example, if an event from stream2 arrives before the
corresponding event from stream1 has been processed, can you simply ignore
the event from stream2? Or should it be buffered, and enriched later? I ask
this because checkpointing can become challenging at scale when joining two
streams, if there's a requirement to buffer one of the streams so the other
can catch up.

Flink may or may not be the best choice for your application. The devil is
in the details.

Regards,
David

On Sun, Apr 25, 2021 at 12:25 PM Omngr 
wrote:

> Thank you David. That's perfect.
>
> Now, I'm just worried about the state size. State size will grow forever.
> There is no TTL.
>
> 24 Nis 2021 Cmt 17:42 tarihinde David Anderson 
> şunu yazdı:
>
>> What are the other techniques for bootstrapping rocksdb state?
>>
>>
>> Bootstrapping state involves somehow creating a snapshot (typically a
>> savepoint, but a retained checkpoint can be a better choice in some cases)
>> containing the necessary state -- meaning that the state has the same
>> operator uid and and state descriptor used by the real streaming job.
>>
>> You can do this by either: (1) running a variant of the live streaming
>> job against the data used for bootstrapping and taking a snapshot when the
>> data has been fully ingested, or (2) by using the State Processor API [1].
>> You'll find a trivial example of the second approach in [2]. Once you have
>> a suitable snapshot, you can run your real job against it.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html
>> [2] https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf
>>
>> Regards,
>> David
>>
>> On Sat, Apr 24, 2021 at 3:01 PM Omngr 
>> wrote:
>>
>>> Hi David, thank you for your response first!
>>>
>>> The state size is about 1 TB for now, but it will increase fastly, and
>>> also I can not use the TLL for states. It will grow indefinitely.
>>> What are the other techniques for bootstrapping rocksdb state?
>>>
>>> David Anderson , 24 Nis 2021 Cmt, 15:43 tarihinde
>>> şunu yazdı:
>>>
 Oguzhan,

 Note, the state size is very large and I have to feed the state from
> batch flow firstly. Thus I can not use the internal state like rocksdb.


 How large is "very large"? Using RocksDB, several users have reported
 working with jobs using many TBs of state.

 And there are techniques for bootstrapping the state. That doesn't have
 to be a showstopper.

 May be any bottleneck in that flow? I think to use asyncMap functions
> for state read/write operations.


 That's a good reason to reconsider using Flink state.

 Regards,
 David



 On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <
 sosyalmedya.oguz...@gmail.com> wrote:

> I'm trying to design a stream flow that checks *de-duplicate* events
> and sends them to the Kafka topic.
>
> Basically, flow looks like that;
>
> kafka (multiple topics) =>  flink (checking de-duplication and event
> enrichment) => kafka (single topic)
>
> For de-duplication, I'm thinking of using Cassandra as an external
> state store. The details of my job;
>
> I have an event payload with *uuid* Field. If the event that has the
> same uuid will come, this event should be discarded. In my case, two kafka
> topics are reading. The first topic has a lot of fields, but other topics
> just have a *uuid* field, thus I have to enrich data using the same
> uuid for the events coming from the second topic.
>
> Stream1: Messages reading from the first topic. Read state from
> Cassandra using the *uuid*. If a state exists, ignore this event and *do
> not* emit to the Kafka. If state does not exist, save  this event to
> the Cassandra, then emit this event to the Kafka.
>
> Stream2: Messages reading from the second topic. Read state from
> Cassandra using the *uuid*. If state exists, check a column that
> represents this event came from topic2. If the value of this column is
> false, enrich the event using state and update the Cassandra column as
> true. If true, ignore this event because this event is a duplicate.
>
> def checkDeDuplication(event): 

The wrong Options of Kafka Connector, will make the cluster can not run any job

2021-04-25 Thread chenxuying
environment:

flinksql 1.12.2

k8s session mode

description:

I got follow error log when my kafka connector port was wrong 

>

2021-04-25 16:49:50

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition filebeat_json_install_log-3 could be 
determined

>




I got follow error log when my kafka connector ip was wrong 

>

2021-04-25 20:12:53

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
while fetching topic metadata

>




When the job was cancelled,there was follow error log:

>

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state 
CANCELLING to CANCELED.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping 
checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - 
Shutting down

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 1 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1'
 not discarded.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 2 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2'
 not discarded.

2021-04-25 08:53:41,116 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 3 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3'
 not discarded.

2021-04-25 08:53:41,137 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED.

2021-04-25 08:53:41,148 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Stopping the JobMaster for job 

Re: 多个复杂算子保证精准一次性

2021-04-25 Thread hk__lrzy
所有算子都需要维护。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flinksql当一个job的kafka连接信息错误时,会导致整个session集群无法正常发布任务

2021-04-25 Thread chenxuying
环境:

flinksql 1.12.2

k8s session模式

描述:

当kafka 端口错误,过一段时间会有如下报错:

2021-04-25 16:49:50

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition filebeat_json_install_log-3 could be 
determined

当kafka ip错误,过一段时间会有如下报错:

2021-04-25 20:12:53

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
while fetching topic metadata







然后对任务执行停止取消操作,会得到如下错误

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state 
CANCELLING to CANCELED.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping 
checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - 
Shutting down

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 1 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1'
 not discarded.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 2 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2'
 not discarded.

2021-04-25 08:53:41,116 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 3 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3'
 not discarded.

2021-04-25 08:53:41,137 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED.

2021-04-25 08:53:41,148 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Stopping the JobMaster for job 
v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf).

2021-04-25 08:53:41,151 INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending 
SlotPool.

2021-04-25 08:53:41,151 

Re: Flink SQL Metrics中Kafka Offset请教

2021-04-25 Thread 占英华
非常感谢!

> 在 2021年4月25日,19:19,JasonLee <17610775...@163.com> 写道:
> 
> hi
> 
> currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets:
> 表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以
> committedOffsets 会比 currentOffsets 大 1
> 
> 
> 
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Flink SQL Metrics中Kafka Offset请教

2021-04-25 Thread JasonLee
hi

currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets:
表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以
committedOffsets 会比 currentOffsets 大 1



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 Thread maker_d...@foxmail.com
hi

从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink如何从流中取出自定义的数据结构并赋值给变量

2021-04-25 Thread 张锴
用侧输出流的方式能单独把值取出来吗?这个要怎么取值呢

JasonLee <17610775...@163.com> 于2021年4月25日周日 下午5:58写道:

> hi
>
> 你可以用 filter 过滤出多个流或者用测流输出的方式分流处理
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: 提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 Thread maker_d...@foxmail.com
您好,
flink-sql-connector-kafka_2.11-1.11.3.jar
这个包已经在flink的lib目录下了。



maker_d...@foxmail.com
 
发件人: JasonLee
发送时间: 2021-04-25 17:56
收件人: user-zh
主题: Re: 提交FlinkSQLKafka表报异常cannt load user class
hi
 
从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


Re: flink如何从流中取出自定义的数据结构并赋值给变量

2021-04-25 Thread JasonLee
hi

你可以用 filter 过滤出多个流或者用测流输出的方式分流处理



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 Thread JasonLee
hi

从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 Thread maker_d...@foxmail.com
社区各位大佬大家好,
我想通过flinkcdc读取mysql表,然后发送到kafka表。
在我使用sql-client客户端向kafka表插入数据时,报如下错误:

2021-04-25 17:21:03
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:590)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
file: 
'/data/yarn/nm/usercache/flink/appcache/application_1618375297719_0009/blobStore-656e7e03-d94c-4861-b492-aeca2e5b4218/job_3682b0f430839794beb0d09e8e53b416/blob_p-e79c4e89fbdd13c78a3a0602a35a8c6f2ab35ebc-2f20c3259bf505db1bb258562da113c0'
 (valid JAR)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:272)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 

flink如何从流中取出自定义的数据结构并赋值给变量

2021-04-25 Thread 张锴
flink版本使用1.12.2。有一个需求就是想要从stream中拿出自定义的数据结构,暂且叫a并赋值给后面变量,基于这个a取他的属性作一些判断操作。
比如:
   val ds: DataStream[b] = stream.filter(_.nonEmpty).map(new
MapFunction[String, b] {

  override def map(value: String) = {
  val recallKafka = JSON.parseObject(value, classOf[a])

  b(recallKafka.group_id, value, recallKafka.eventTime)

  }
})

val kafkaCommonData: a =recallKafka
判断条件
 if (kafkaCommonData.data.date != null) {x}
if (kafkaCommonData.data.userinfo != null) {}
.
请问一下,我通过什么方法能单独把流中的某个数据结构给取出来呢?如果有方式的话应该要怎么写呢?大佬们帮忙看一下啊,卡了好几天 了,难受。。




Read Hive table in Stream Mode use distinct cause heap OOM

2021-04-25 Thread 张颖
hi,I met an appearance like this:


this is my sql:
SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat FROM 
app.app_ranking_feature_table_clk_ord_hp_new_all_tree_orc where dt='2021-04-01'




When I useBlinkPlanner inBatchMode, It works well; But if I set inStreamMode,
It cause a heap OOM.


Causedby: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at 
org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer(DataOutputSerializer.java:85)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:113)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:129)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:399)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$221/285424866.runDefaultAction(UnknownSource)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:620)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:584)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636)
at java.lang.Thread.run(Thread.java:748)



I use the rocksdb, and I confirm it works,then I jmap the tm:
num #instances #bytes  class name
--
   1:214656 4420569368  [C
   2:99 2376771576  [B
   3:1379047722624  
org.apache.flink.core.memory.HybridMemorySegment
   4:2145395148936  java.lang.String
   5: 317962635104  [Ljava.lang.Object;
   6:1051332523192  
[Lorg.apache.flink.core.memory.MemorySegment;
   7:1051152522760  
org.apache.flink.table.data.binary.BinarySection
   8:1051152522760  
org.apache.flink.table.data.binary.BinaryStringData
   9: 328122099968  java.nio.DirectByteBuffer
  10: 148381651560  java.lang.Class
  11: 500021600064  java.util.concurrent.ConcurrentHashMap$Node
  12: 430141376448  java.util.Hashtable$Entry
  13: 328051312200  sun.misc.Cleaner



It looks like the data is in heap rather than in rocksdb, Is there any way to 
set the data to the rocksdb?



Re: Too man y checkpoint folders kept for externalized retention.

2021-04-25 Thread Yun Gao
Hi John,

Logically the maximum retained checkpoints are configured
by state.checkpoints.num-retained [1]. Have you configured 
this option?


Best,
Yun

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#state-checkpoints-num-retained



--
Sender:John Smith
Date:2021/04/24 01:41:41
Recipient:user
Theme:Too man y checkpoint folders kept for externalized retention.

Hi running 1.10.0.

Just curious is this specific to externalized retention or checkpointing in 
general.

I see my checkpoint folder counting thousands of chk-x folders.

If using default checkpoint or NONE externalized checkpointing does the count 
of chk- folders grow indefinitely until the job is killed or it retains up 
to certain amount?

Thanks 


Receiving context information through JobListener interface

2021-04-25 Thread Barak Ben Nathan

Hi all,

I am building an application that launches Flink Jobs and monitors them.

I want to use the JobListener interface to output job evemts to a Kafka Topic.

The problem:
In the application we have RuleId, i.e.  business logic identifier for the job, 
 and there’s JobId which is  the internal identifier generated by Flink.
I need the events emitted to Kafka to be partitioned by *RuleId*.

Is there a way to pass this kind of information to Flink and get it through the 
JobListener interface?

Thanks,
Barak


Flink missing Kafka records

2021-04-25 Thread Dan Hill
Hi!

Have any other devs noticed issues with Flink missing Kafka records with
long-running Flink jobs?  When I re-run my Flink job and start from the
earliest Kafka offset, Flink processes the events correctly.  I'm using
Flink v1.11.1.

I have a simple job that takes records (Requests) from Kafka and serializes
them to S3.  Pretty basic.  No related issues in the text logs.  I'm hoping
I just have a configuration issue.  I'm guessing idleness is working in a
way that I'm not expecting.

Any ideas?
- Dan


void createLogRequestJob(StreamExecutionEnvironment env) throws Exception {

  Properties kafkaSourceProperties = getKafkaSourceProperties("logrequest");

  SingleOutputStreamOperator rawRequestInput = env.addSource(

new FlinkKafkaConsumer(getInputRequestTopic(),
getProtoDeserializationSchema(Request.class), kafkaSourceProperties))

  .uid("source-request")

  .name("Request")

  .assignTimestampsAndWatermarks(


WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));


  executeLogRequest(rawRequestInput);

  env.execute("log-request");

}


void executeLogRequest(SingleOutputStreamOperator rawRequestInput)
{

  AvroWriterFactory factory = getAvroWriterFactory(Request.class);

  rawRequestInput.addSink(StreamingFileSink

  .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
factory)

  .withBucketAssigner(new DateHourBucketAssigner(request ->
request.getTiming().getEventApiTimestamp()))

  .withRollingPolicy(OnCheckpointRollingPolicy.build())

  .withOutputFileConfig(createOutputFileConfig())

  .build())

.uid("sink-s3-raw-request")

.name("S3 Raw Request");

}


flink sql 使用cdc 同步postgresql的数据到ES,报错: org.postgresql.util.PSQLException: 错误: 内存用尽

2021-04-25 Thread william
org.apache.kafka.connect.errors.ConnectException: An exception occurred in
the change event producer. This connector will be stopped.
at
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0]
at
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:150)
~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:140)
~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:113)
~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_202]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_202]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_202]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
Caused by: org.postgresql.util.PSQLException: 错误: 内存用尽
Detail: 无法为包含1073741350字节的字符串缓冲区扩大525个更多字节.
Where: 槽 "xxx_xxx", 输出插件 "wal2json", 在 change 回调, 关联的 LSN 地址为690/69ABCE18



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL Metrics中Kafka Offset请教

2021-04-25 Thread 邮件帮助中心
Flink 
SQL任务提交后,从JobManager监控指标中发现kafka的offset有2个指标信息,currentOffsets和committedOffsets,当Kafka无新增数据,程序运行一段时间后,发现指标仪表盘上显示
currentOffsets:2897
committedOffsets:2898
这2个值没有变化(应该是数据已经消费完毕了),现在的疑惑是:怎么这2个offset的值还不一致?committedOffsets表示已经提交和保存state中的offset吗?currentOffsets表示啥含义?烦请指教下,多谢!