flink cdc 消费mysql binlog 每次都是从头开始消费问题

2021-04-23 Thread 董建
大家好,我最近采用了flink cdc 对接mysql binlog ,每次重启或者停止job后,都是从表的第一条数据开始消费。
有做checkpoint和持久化,并且日志提示checkpoint成功,不知道为何重启应用始终是从头开始消费?
按照官方定义
.startupOptions(StartupOptions.initial()) 应该是历史+增量才对
flink 版本:1.12.2
flink cdc  版本:flink-sql-connector-mysql-cdc-1.4-SNAPSHOT.jar
相关核心代码:
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new 
RocksDBStateBackend(config.getProperty("stateBackend.path")).getCheckpointBackend());
CheckpointConfig checkpointConfig = env.getCheckpointConfig();

checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.enableCheckpointing(10 * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);


DebeziumSourceFunction sourceMilApplysLogStream = 
MySQLSource.builder()
.hostname(config.getProperty("datasource.db"))
.port(Integer.parseInt(config.getProperty("datasource.port")))
.username(config.getProperty("datasource.username"))
.password(config.getProperty("datasource.password"))
.databaseList(config.getProperty("datasource.databaseList"))
.tableList(config.getProperty("datasource.tableList"))
.deserializer(new DebeziumDeserialization())

.serverId(Integer.parseInt(config.getProperty("datasource.server-id")))
.startupOptions(StartupOptions.initial())
.build();





Re: Flink job消费kafka 失败,无法拿到offset值

2021-04-23 Thread liang zhao
一个查Flink与集群B的网络连通性,一个是查集群B本身服务状态,比如是所有topic都连不上还是某个topic连不上,是不是有节点挂掉了,另外一个再看看kafka的_offset_consumer是不是单节点正好挂了,如果是单个topic看分区问题。

> 2021年4月23日 18:45,wysstar...@163.com 写道:
> 
> 应该是flink 连接不上kafka ,建议往docker 的网络设置上找找看问题
> 
> 发自我的iPhone
> 
>> 在 2021年4月23日,下午12:56,Qingsheng Ren  写道:
>> 
>> 你好 Jacob,
>> 
>> 从错误上来看是 Kafka Consumer 没有连上 Kafka Brokers。这些方法可能帮助排查问题:
>> 
>> 1. 确认 Flink TaskManager 和 Kafka Broker 之间的网络连通性。
>> 2. Flink TaskManager 与 Kafka Broker 之间网络连通并不意味着能够消费数据,可能需要修改 Kafka Broker 
>> 的配置。这篇文章[1] 或许会有帮助,绝大多数 Kafka 的连接问题是由于文章中描述的配置问题导致的。
>> 3. 配置 Log4j 将 org.apache.kafka.clients.consumer 的 Log level 配置为 DEBUG 或 
>> TRACE,在日志中获取到更多的信息以帮助排查。
>> 
>> 希望有所帮助!
>> 
>> [1] 
>> https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
>> 
>> —
>> Best Regards,
>> 
>> Qingsheng Ren
>> 在 2021年4月14日 +0800 PM12:13,Jacob <17691150...@163.com>,写道:
>>> 有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink
>>> Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。
>>> 
>>> Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:
>>> 
>>> java.lang.Exception: org.apache.kafka.common.errors.TimeoutException:
>>> Timeout of 6ms expired before the position for partition Test-topic-27
>>> could be determined
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
>>> 6ms expired before the position for partition Test-topic-27 could be
>>> determined
>>> 
>>> 查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。
>>> 
>>> 请指教
>>> 
>>> 
>>> 
>>> -
>>> Thanks!
>>> Jacob
>>> --
>>> Sent from: http://apache-flink.147419.n8.nabble.com/
>> 
> 
> 



pojo warning when using auto generated protobuf class

2021-04-23 Thread Prashant Deva
I am seeing this warning msg when trying to use a custom protobuf
de/serializer with kafka source with auto generated java protobuf class:

18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor
- Class class com.xx.APITrace cannot be used as a POJO type because not all
fields are valid POJO fields, and must be processed as GenericType. Please
read the Flink documentation on "Data Types & Serialization" for details of
the effect on performance.

here is my serializer. What am i doing wrong?

class ApiTraceSchema: DeserializationSchema,
SerializationSchema {

override fun getProducedType(): TypeInformation {
return TypeInformation.of(Trace.APITrace::class.java)
}

override fun deserialize(message: ByteArray): Trace.APITrace {
return Trace.APITrace.parseFrom(message)
}

override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
return false
}

override fun serialize(element: Trace.APITrace): ByteArray {
return element.toByteArray()
}
}


Writing to Avro from pyflink

2021-04-23 Thread Edward Yang
I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu,
I've tested my code with an iterator writing to csv and everything works as
expected. Reading through the flink documentation I see that I should add
jar dependencies to work with avro. I downloaded three jar files that I
believe are required for avro like so:

table_env\
.get_config()\
.get_configuration()\
.set_string(
"pipeline.jars",

rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

I suspect I'm not loading the jar files correctly, but it's unclear what
I'm supposed to do as I'm not familiar with java and when I switch the sink
format to avro I get some unexpected errors:

Py4JJavaError: An error occurred while calling o746.executeInsert.
: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
at 
org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
at 
org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter
at 

FLINK Invocation error

2021-04-23 Thread Vijayendra Yadav
Hi Team,

While restarting Flink application from CHECKPOINT, facing the following
Error(intermittently), but it does not impact Job getting submitted or
functionality. But still wondering what could be the reason and solution ?

*RUN Command:*

/usr/lib/flink/bin/flink run
   \
-s
*s3://bucket-app/flink/checkpoint/app/0c9be9b65962e068b6b138ed81f7ae14/chk-13229/*
  \
-c com.comp.App \
-m yarn-cluster
  \
-yjm 4096m
   \
-ytm 6144m
   \
-ynm flink-app\
-yt ${app_install_path}/conf
   \
${app_install_path}/*.jar
  \
--conffile ${app_install_path}/application.properties
  \
--app App


*ERROR Messages:*

*Job has been submitted with JobID e510e34928101ed53cb08df6d3d29f69*
13:00:35.488 [main] ERROR org.apache.flink.client.cli.CliFrontend - Error
while running the command.
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error:
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Number of retries has been exhausted.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_265]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_265]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
[hadoop-common-2.10.0-amzn-0.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Number of retries has been exhausted.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
~[?:1.8.0_265]
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at com.directv.dis.DisFlinkService.execute(DisFlinkService.java:73)
~[?:?]
at
com.directv.dis.DisFlinkEmrApplication.main(DisFlinkEmrApplication.java:38)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_265]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_265]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_265]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
... 11 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Number of retries has been exhausted.
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:302)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_265]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
~[?:1.8.0_265]
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)

Re: MemoryStateBackend Issue

2021-04-23 Thread Milind Vaidya
Hi Matthias,

Yeah you are right. I am canceling the job and hence it is creating new job
with new job id and hence it is no respecting previous checkpoint. I
observed same behaviour even for local FS backend.

Is there any way to simulated failing of job locally ?

As far as config is concerned, I have not configured any back end in the
conf file and defaulting to Memory Checkpoint.

Thanks,
Milind



On Fri, Apr 23, 2021 at 12:32 AM Matthias Pohl 
wrote:

> One additional question: How did you stop and restart the job? The
> behavior you're expecting should work with stop-with-savepoint. Cancelling
> the job and then just restarting it wouldn't work. The latter approach
> would lead to a new job being created.
>
> Best,
> Matthias
>
> On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl 
> wrote:
>
>> Hi Milind,
>> I bet someone else might have a faster answer. But could you provide the
>> logs and config to get a better understanding of what your issue is?
>> In general, the state is maintained even in cases where a TaskManager
>> fails.
>>
>> Best,
>> Matthias
>>
>> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya  wrote:
>>
>>> Hi
>>>
>>> I see MemoryStateBackend being used in TM Log
>>>
>>> org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend
>>> has been configured, using default (Memory / JobManager)
>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>>> maxStateSize: 5242880)
>>>
>>>
>>>
>>> I am logging checkpointed value which is just message count
>>>
>>> Snapshot the state 500
>>> Snapshot the state 1000
>>>
>>>
>>> When I restart the job i.e. new TM but the job manager is same I see
>>>
>>> Snapshot the state 500
>>>
>>> In the JM logs I see following entries
>>>
>>> Triggering checkpoint 1
>>> Triggering checkpoint 2
>>>
>>> After restarting job hence new TM
>>>
>>> Triggering checkpoint 1
>>>
>>> As per my understanding JM should hold the checkpointed
>>> 
>>> state across TM ? Am I correct?
>>>
>>> I have not configured anything special and using default. Do I need to
>>> add any setting to make it work ?
>>> I want to maintain message count across the TMs.
>>>
>>
>
> --
>
> Matthias Pohl | Engineer
>
> Follow us @VervericaData Ververica 
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>


Too man y checkpoint folders kept for externalized retention.

2021-04-23 Thread John Smith
Hi running 1.10.0.

Just curious is this specific to externalized retention or checkpointing in
general.

I see my checkpoint folder counting thousands of chk-x folders.

If using default checkpoint or NONE externalized checkpointing does the
count of chk- folders grow indefinitely until the job is killed or it
retains up to certain amount?

Thanks


Re: Official flink java client

2021-04-23 Thread gaurav kulkarni
 Thanks for the response, folks! I plan to use the client mostly for monitoring 
status of jobs, probably to trigger savepoints too. I may extend it in future 
to submit jobs. Given RestClusterClient is not officially supported, I will 
probably build something myself. Agree with Flavio, it would be great if there 
is an official client available or if the client can be generated 
automatically. 
Thanks,Gaurav
On Friday, April 23, 2021, 06:18:35 AM PDT, Flavio Pompermaier 
 wrote:  
 
 Yes, that's a known risk. Indeed it would be awesome if the REST API would be 
published also using some format that allow automatic client generation (like 
swagger or openapi). Also release an official client could be an option 
otherwise...I think that it's very annoying to write a client from scratch.I'll 
continue to use RestClusterClient until it works..
On Fri, Apr 23, 2021 at 2:48 PM Yun Gao  wrote:

Hi Flavio,
Got that, from my view I think RestClusterClient might not be viewed as public 
API, and might be change between version, thus it might need to be careful when 
upgrading.
Best,Yun


 --Original Mail --Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 16:10:05 2021Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Re: Official flink java client
Obviously I could rewrite a java client from scratch that interface with the 
provided REST API but why if I can reuse something already existing?Usually I 
interface with REST API using auto generated clients (if APIs are exposed via 
Swagger or OpenApi).If that's not an option, writing a REST client from scratch 
is something I try to avoid as much as I can..
Best,Flavio
On Fri, Apr 23, 2021 at 9:55 AM Yun Gao  wrote:

Hi Falvio,
Very thanks for the explanation, may be another option is to have a look at the 
http rest API[1] ? Flink provides official http api to submit jar jobs and 
query job status, and they might be able to help.
Best,Yun
[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
 --Original Mail --Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 15:25:55 2021Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Official flink java client
I also interface to Flink clusters using REST in order to avoid many annoying 
problems (due to dependency conflicts, classpath or env variables).I use an 
extended version of the RestClusterClient that you can reuse if you want to.It 
is available at [1] and it add some missing methods to the default Flink 
version (I also had to copy that class and modify the visibility of some field 
in order to enable the extension).Officially the Flink RestClusterClient is 
meant to be used for internal use only but it actually work very well. 
Best,Flavio
[1] 
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

Hi gaurav,
Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the StreamExecutionEnvironment to execute their jobs. Could you share 
more about why you want to directly use the client? 
Best,Yun


 --Original Mail --Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021Recipients:User 
Subject:Official flink java client
Hi, 
Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 
Thanks,Gaurav

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Run already deployed job on Flink Cluster using RestClusterClient

I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...
 |

 |

 |













  

Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-23 Thread Miguel Araújo
Thanks for your replies. I agree this is a somewhat general problem.
I posted it here as I was trying to register the valid subclasses in Kryo
but I couldn't get the message to go away, i.e., everything worked
correctly but there was the complaint that GenericType serialization was
being used.

This is how I was registering these types:

env.getConfig.registerKryoType(classOf[java.lang.Integer])
env.getConfig.registerKryoType(classOf[java.lang.Double])

and this is the message I got on every event:

flink-task-manager_1  | 2021-04-23 16:48:29.274 [Processor Function 1
(1/2)#0] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - No
fields were detected for class java.lang.Number so it cannot be used as a
POJO type and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance.

In the meanwhile, I've changed my approach to reuse a protobuf type I
already had as part of my input event.

Once again, thanks for your replies because they gave me the right
perspective.



Arvid Heise  escreveu no dia quarta, 21/04/2021 à(s)
18:26:

> Hi Miguel,
>
> as Klemens said this is a rather general problem independent of Flink: How
> do you map Polymorphism in serialization?
>
> Flink doesn't have an answer on its own, as it's discouraged (A Number can
> have arbitrary many subclasses: how do you distinguish them except by
> classname? That adds a ton of overhead.). The easiest solution in your case
> is to convert ints into double.
> Or you use Kryo which dictionary encodes the classes and also limits the
> possible subclasses.
>
> On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <
> klemens.muthm...@cyface.de> wrote:
>
>> Hi,
>>
>> I guess this is more of a Java Problem than a Flink Problem. If you want
>> it quick and dirty you could implement a class such as:
>>
>> public class Value {
>> private boolean isLongSet = false;
>> private long longValue = 0L;
>> private boolean isIntegerSet = false;
>> private int intValue = 0;
>>
>>public Value(final long value) {
>>setLong(value);
>>}
>>
>> public void setLong(final long value) |
>> longValue = value;
>> isLongSet = true;
>>}
>>
>>public long getLong() {
>>if(isLongSet) {
>>return longValue
>>}
>>}
>>
>>// Add same methods for int
>>// to satisfy POJO requirements you will also need to add a
>> no-argument constructor as well as getters and setters for the boolean flags
>> }
>>
>> I guess a cleaner solution would be possible using a custom Kryo
>> serializer as explained here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>>
>> Regards
>>   Klemens
>>
>>
>>
>> > Am 20.04.2021 um 10:34 schrieb Miguel Araújo > >:
>> >
>> > Hi everyone,
>> >
>> > I have a ProcessFunction which needs to store different number types
>> for different keys, e.g., some keys need to store an integer while others
>> need to store a double.
>> >
>> > I tried to use java.lang.Number as the type for the ValueState, but I
>> got the expected "No fields were detected for class java.lang.Number so it
>> cannot be used as a POJO type and must be processed as GenericType."
>> >
>> > I have the feeling that this is not the right approach, but the exact
>> type to be stored is only known at runtime which makes things a bit
>> trickier. Is there a way to register these classes correctly, or Is it
>> preferable to use different ValueState's for different types?
>> >
>> > Thanks,
>> > Miguel
>>
>>


Re: Question about snapshot file

2021-04-23 Thread David Anderson
Abdullah,

ReadRidesAndFaresSnapshot [1] is an example that shows how to use the State
Processor API to display the contents of a snapshot taken while running
RidesAndFaresSolution [2].

Hopefully that will help you get started.

[1]
https://github.com/ververica/flink-training/blob/master/state-processor/src/main/java/com/ververica/flink/training/exercises/ReadRidesAndFaresSnapshot.java
[2]
https://github.com/ververica/flink-training/blob/master/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java

Best regards,
David

On Fri, Apr 23, 2021 at 3:32 PM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> Thank you for your reply.
>
> I want to read the previous snapshot (if needed) at the time of operation.
> In [1], there is a portion:
>
> DataSet listState  = savepoint.readListState<>(
> "my-uid",
> "list-state",
> Types.INT);
>
>
> here, will the function savepoint.readliststate<> () work to read the
> previous snapshot?  If it is, then is the filename of a savepoint file
> similar to my-uid?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> Thank you
>
>
>
>
> On Fri, Apr 23, 2021 at 1:11 AM Matthias Pohl 
> wrote:
>
>> What is it you're trying to achieve in general? The JavaDoc of
>> MetadataV2V3SerializerBase provides a description on the format of the
>> file. Theoretically, you could come up with custom code using the Flink
>> sources to parse the content of the file. But maybe, there's another way to
>> accomplish what you're trying to do.
>>
>> Matthias
>>
>> [1]
>> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>>
>> On Thu, Apr 22, 2021 at 7:53 PM Abdullah bin Omar <
>> abdullahbinoma...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a savepoint or checkpointed file from my task. However, the file
>>> is binary. I want to see what the file contains.
>>>
>>> How is it possible to see what information the file has (or how it is
>>> possible to make it human readable?)
>>>
>>> Thank you
>>>
>>> On Thu, Apr 22, 2021 at 10:19 AM Matthias Pohl 
>>> wrote:
>>>
 Hi Abdullah,
 the metadata file contains handles to the operator states of the
 checkpoint [1]. You might want to have a look into the State Processor API
 [2].

 Best,
 Matthias

 [1]
 https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
 [2]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

 On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
 abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> (1) what 's the snapshot metadata file (binary) contains ? is it
> possible to read the snapshot metadata file by using Flink 
> Deserialization?
>
> (2) is there any function that can be used to see the previous
> states on time of operation?
>
> Thank you
>

>>


Re: Question about snapshot file

2021-04-23 Thread Abdullah bin Omar
Hi,

Thank you for your reply.

I want to read the previous snapshot (if needed) at the time of operation.
In [1], there is a portion:

DataSet listState  = savepoint.readListState<>(
"my-uid",
"list-state",
Types.INT);


here, will the function savepoint.readliststate<> () work to read the
previous snapshot?  If it is, then is the filename of a savepoint file
similar to my-uid?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Thank you




On Fri, Apr 23, 2021 at 1:11 AM Matthias Pohl 
wrote:

> What is it you're trying to achieve in general? The JavaDoc of
> MetadataV2V3SerializerBase provides a description on the format of the
> file. Theoretically, you could come up with custom code using the Flink
> sources to parse the content of the file. But maybe, there's another way to
> accomplish what you're trying to do.
>
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>
> On Thu, Apr 22, 2021 at 7:53 PM Abdullah bin Omar <
> abdullahbinoma...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a savepoint or checkpointed file from my task. However, the file
>> is binary. I want to see what the file contains.
>>
>> How is it possible to see what information the file has (or how it is
>> possible to make it human readable?)
>>
>> Thank you
>>
>> On Thu, Apr 22, 2021 at 10:19 AM Matthias Pohl 
>> wrote:
>>
>>> Hi Abdullah,
>>> the metadata file contains handles to the operator states of the
>>> checkpoint [1]. You might want to have a look into the State Processor API
>>> [2].
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>>
>>> On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
>>> abdullahbinoma...@gmail.com> wrote:
>>>
 Hi,

 (1) what 's the snapshot metadata file (binary) contains ? is it
 possible to read the snapshot metadata file by using Flink Deserialization?

 (2) is there any function that can be used to see the previous
 states on time of operation?

 Thank you

>>>
>


Re: Re: Re: Official flink java client

2021-04-23 Thread Flavio Pompermaier
Yes, that's a known risk. Indeed it would be awesome if the REST API would
be published also using some format that allow automatic client generation
(like swagger or openapi). Also release an official client could be an
option otherwise...I think that it's very annoying to write a client from
scratch.
I'll continue to use RestClusterClient until it works..

On Fri, Apr 23, 2021 at 2:48 PM Yun Gao  wrote:

> Hi Flavio,
>
> Got that, from my view I think RestClusterClient might not be viewed as
> public API,
> and might be change between version, thus it might need to be careful when
> upgrading.
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Flavio Pompermaier 
> *Send Date:*Fri Apr 23 16:10:05 2021
> *Recipients:*Yun Gao 
> *CC:*gaurav kulkarni , User <
> user@flink.apache.org>
> *Subject:*Re: Re: Official flink java client
>
>> Obviously I could rewrite a java client from scratch that interface with
>> the provided REST API but why if I can reuse something already existing?
>> Usually I interface with REST API using auto generated clients (if APIs
>> are exposed via Swagger or OpenApi).
>> If that's not an option, writing a REST client from scratch is something
>> I try to avoid as much as I can..
>>
>> Best,
>> Flavio
>>
>> On Fri, Apr 23, 2021 at 9:55 AM Yun Gao  wrote:
>>
>>> Hi Falvio,
>>>
>>> Very thanks for the explanation, may be another option is to have a look
>>> at
>>> the http rest API[1] ? Flink provides official http api to submit jar
>>> jobs and query
>>> job status, and they might be able to help.
>>>
>>> Best,
>>> Yun
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
>>>
>>> --Original Mail --
>>> *Sender:*Flavio Pompermaier 
>>> *Send Date:*Fri Apr 23 15:25:55 2021
>>> *Recipients:*Yun Gao 
>>> *CC:*gaurav kulkarni , User <
>>> user@flink.apache.org>
>>> *Subject:*Re: Official flink java client
>>>
 I also interface to Flink clusters using REST in order to avoid many
 annoying problems (due to dependency conflicts, classpath or env 
 variables).
 I use an extended version of the RestClusterClient that you can reuse
 if you want to.
 It is available at [1] and it add some missing methods to the default
 Flink version (I also had to copy that class and modify the visibility of
 some field in order to enable the extension).
 Officially the Flink RestClusterClient is meant to be used for internal
 use only but it actually work very well.

 Best,
 Flavio

 [1]
 https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java

 On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

> Hi gaurav,
>
> Logicall Flink client is bear inside the StreamExecutionEnvironment,
> and users could use the
> StreamExecutionEnvironment to execute their jobs. Could you share more
> about why you
> want to directly use the client?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*gaurav kulkarni 
> *Send Date:*Fri Apr 23 10:14:08 2021
> *Recipients:*User 
> *Subject:*Official flink java client
>
>> Hi,
>>
>> Is there any official flink client in java that's available? I came
>> across RestClusterClient
>> ,
>> but I am not sure if its official
>> .
>> I can create my own client, but just wanted to check if there is anything
>> official available already that I can leverage.
>>
>> Thanks,
>> Gaurav
>>
>> Run already deployed job on Flink Cluster using RestClusterClient
>>
>> I am trying to run already deployed job on Flink Cluster using Rest
>> request.I had success using a simple rest ...
>>
>> 
>>
>>
>>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-23 Thread Vishal Santoshi
Great, thanks for the update.  The upfront filter does work and has for the
last 24 hours and no reason why it should not.

Again I have to note that there is no mail group that has been this
reactive to issues, so thank you again.



On Fri, Apr 23, 2021 at 4:34 AM Matthias Pohl 
wrote:

> After having talked to David about this issue offline, I decided to create
> a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the
> mailing list, Vishal. Hopefully, the community has the chance to look into
> it.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-22425
>
> On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl 
> wrote:
>
>> To me, it sounds strange. I would have expected it to work with
>> `allowedLateness` and `sideOutput` being defined. I pull in David to have a
>> look at it. Maybe, he has some more insights. I haven't worked that much
>> with lateness, yet.
>>
>> Matthias
>>
>> On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in not pulled from the a simulated sideout ( below )
>>>
>>> >> Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in *now* pulled from the simulated sideout , essentially the
>>> Process Function with a reverse predicate to the Filter Process Function.
>>>
>>>
>>> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 And when I added the filter the Exception was not thrown. So the
 sequence of events

 * Increased lateness from 12 ( that was what it was initially running
 with )  to 24 hours
 * the pipe ran as desired before it blew up with the Exception
 * masked the issue by increasing the lateness to 48 hours.
 * It blew up again but now after the added lateness, so essentially the
 same issue but added lateness let the pipe run for another few hours.
 * Added the Fliter upfront  as below, the pipe has no issues. Also
 metrics show that no data is being pushed through the sideoutput and that
 data in not pulled from the a simulated sideout ( below )


 public class LateEventFilter extends ProcessFunction>>> KEY, VALUE>, KeyedTimedValue> {
 private static final long serialVersionUID = 1L;

 long allowedLateness;
 public LateEventFilter(long allowedLateness){
 this.allowedLateness = allowedLateness;
 }
 @Override
 public void processElement(KeyedTimedValue value, Context
 ctx,
 Collector> out) throws Exception {
 if (ctx.timestamp() + allowedLateness > ctx.timerService().
 currentWatermark()) {
 out.collect(value);
 }
 }
 }


 public class LateEventSideOutput extends ProcessFunction<
 KeyedTimedValue, KeyedTimedValue> {
 private static final long serialVersionUID = 1L;

 long allowedLateness;
 public LateEventSideOutput(long allowedLateness){
 this.allowedLateness = allowedLateness;
 }
 @Override
 public void processElement(KeyedTimedValue value, Context
 ctx,
 Collector> out) throws Exception {
 if (ctx.timestamp() + allowedLateness <= ctx.timerService().
 currentWatermark()) {
 out.collect(value);
 }
 }
 }



  I am using RocksDB as a backend if that helps.

 On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Yes sir. The allowedLateNess and side output always existed.
>
> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
> wrote:
>
>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>> being added to your pipeline when running into the
>> UnsupportedOperationException issue previously?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> As in this is essentially doing what lateness *should* have done
>>> And I think that is a bug. My code now is . Please look at
>>> the allowedLateness on the session window.
>>>
>>> SingleOutputStreamOperator>
>>> filteredKeyedValue = keyedValue
>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>> "late_filter").uid("late_filter");
>>> SingleOutputStreamOperator>
>>> lateKeyedValue = keyedValue
>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
>>> name("late_data").uid("late_data");
>>> SingleOutputStreamOperator>
>>> aggregate = filteredKeyedValue

Re: Re: Re: Official flink java client

2021-04-23 Thread Yun Gao
Hi Flavio,

Got that, from my view I think RestClusterClient might not be viewed as public 
API, 
and might be change between version, thus it might need to be careful when 
upgrading.

Best,
Yun



 --Original Mail --
Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 16:10:05 2021
Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Re: Official flink java client

Obviously I could rewrite a java client from scratch that interface with the 
provided REST API but why if I can reuse something already existing?
Usually I interface with REST API using auto generated clients (if APIs are 
exposed via Swagger or OpenApi).
If that's not an option, writing a REST client from scratch is something I try 
to avoid as much as I can..

Best,
Flavio
On Fri, Apr 23, 2021 at 9:55 AM Yun Gao  wrote:

Hi Falvio,

Very thanks for the explanation, may be another option is to have a look at 
the http rest API[1] ? Flink provides official http api to submit jar jobs and 
query 
job status, and they might be able to help.

Best,
Yun

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
 --Original Mail --
Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 15:25:55 2021
Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Official flink java client

I also interface to Flink clusters using REST in order to avoid many annoying 
problems (due to dependency conflicts, classpath or env variables).
I use an extended version of the RestClusterClient that you can reuse if you 
want to.
It is available at [1] and it add some missing methods to the default Flink 
version (I also had to copy that class and modify the visibility of some field 
in order to enable the extension).
Officially the Flink RestClusterClient is meant to be used for internal use 
only but it actually work very well. 

Best,
Flavio

[1] 
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

Hi gaurav,

Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the 
StreamExecutionEnvironment to execute their jobs. Could you share more about 
why you 
want to directly use the client? 

Best,
Yun



 --Original Mail --
Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021
Recipients:User 
Subject:Official flink java client

Hi, 

Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 

Thanks,
Gaurav












Run already deployed job on Flink Cluster using 
RestClusterClient
I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...




Re: Approaches for external state for Flink

2021-04-23 Thread Raghavendar T S
Hi Oğuzhan

Take a look at bloom filter. You might get better ideas.

Links:
https://en.wikipedia.org/wiki/Bloom_filter
https://stackoverflow.com/questions/4282375/what-is-the-advantage-to-using-bloom-filters
https://redislabs.com/modules/redis-bloom/

Thank you

On Fri, Apr 23, 2021 at 3:52 PM Oğuzhan Mangır <
sosyalmedya.oguz...@gmail.com> wrote:

> I'm trying to design a stream flow that checks *de-duplicate* events and
> sends them to the Kafka topic.
>
> Basically, flow looks like that;
>
> kafka (multiple topics) =>  flink (checking de-duplication and event
> enrichment) => kafka (single topic)
>
> For de-duplication, I'm thinking of using Cassandra as an external state
> store. The details of my job;
>
> I have an event payload with *uuid* Field. If the event that has the same
> uuid will come, this event should be discarded. In my case, two kafka
> topics are reading. The first topic has a lot of fields, but other topics
> just have a *uuid* field, thus I have to enrich data using the same uuid
> for the events coming from the second topic.
>
> Stream1: Messages reading from the first topic. Read state from Cassandra
> using the *uuid*. If a state exists, ignore this event and *do not* emit
> to the Kafka. If state does not exist, save  this event to the Cassandra,
> then emit this event to the Kafka.
>
> Stream2: Messages reading from the second topic. Read state from Cassandra
> using the *uuid*. If state exists, check a column that represents this
> event came from topic2. If the value of this column is false, enrich the
> event using state and update the Cassandra column as true. If true, ignore
> this event because this event is a duplicate.
>
> def checkDeDuplication(event): Option[Event] = {
>   val state = readFromCassandra(state)
>   if (state exist) None //ignore this event
>   else {
> saveEventToCassandra(event)
> Some(event)
>   }
> }
>
> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
>   val state = readFromCassandra(state)
>   if (state does not exist) None //ignore this event
>   else {
> if (state.flag == true) None // ignore this event
> else {
>updateFlagAsTrueInCassandra(event)
>Some(event)
> }
>   }
> }
>
>
> val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
> stream1.union(stream2).addSink(kafkaSink)
>
> 1- Is that a good approach?
>
> 2- Is Cassandra the right choice here? Note, the state size is very large
> and I have to feed the state from batch flow firstly. Thus I can not use
> the internal state like rocksdb.
>
> 3- Can i improve this logic?
>
> 4- May be any bottleneck in that flow? I think to use asyncMap functions
> for state read/write operations.
>


-- 
Raghavendar T S
www.teknosrc.com


Re: Flink job消费kafka 失败,无法拿到offset值

2021-04-23 Thread wysstartgo
应该是flink 连接不上kafka ,建议往docker 的网络设置上找找看问题

发自我的iPhone

> 在 2021年4月23日,下午12:56,Qingsheng Ren  写道:
> 
> 你好 Jacob,
> 
> 从错误上来看是 Kafka Consumer 没有连上 Kafka Brokers。这些方法可能帮助排查问题:
> 
> 1. 确认 Flink TaskManager 和 Kafka Broker 之间的网络连通性。
> 2. Flink TaskManager 与 Kafka Broker 之间网络连通并不意味着能够消费数据,可能需要修改 Kafka Broker 
> 的配置。这篇文章[1] 或许会有帮助,绝大多数 Kafka 的连接问题是由于文章中描述的配置问题导致的。
> 3. 配置 Log4j 将 org.apache.kafka.clients.consumer 的 Log level 配置为 DEBUG 或 
> TRACE,在日志中获取到更多的信息以帮助排查。
> 
> 希望有所帮助!
> 
> [1] 
> https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
> 
> —
> Best Regards,
> 
> Qingsheng Ren
> 在 2021年4月14日 +0800 PM12:13,Jacob <17691150...@163.com>,写道:
>> 有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink
>> Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。
>> 
>> Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:
>> 
>> java.lang.Exception: org.apache.kafka.common.errors.TimeoutException:
>> Timeout of 6ms expired before the position for partition Test-topic-27
>> could be determined
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
>> 6ms expired before the position for partition Test-topic-27 could be
>> determined
>> 
>> 查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。
>> 
>> 请指教
>> 
>> 
>> 
>> -
>> Thanks!
>> Jacob
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
> 




temporarily unavailable due to an ongoing leader election

2021-04-23 Thread tianxy
flink1.12.2 on yarn 3.1.5  作业手动触发savepoint 成功   然后使用run -s 启动成功  但是访问flinkui
报service temporarily unavailable due to an ongoing leader election




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Approaches for external state for Flink

2021-04-23 Thread Oğuzhan Mangır
I'm trying to design a stream flow that checks *de-duplicate* events and
sends them to the Kafka topic.

Basically, flow looks like that;

kafka (multiple topics) =>  flink (checking de-duplication and event
enrichment) => kafka (single topic)

For de-duplication, I'm thinking of using Cassandra as an external state
store. The details of my job;

I have an event payload with *uuid* Field. If the event that has the same
uuid will come, this event should be discarded. In my case, two kafka
topics are reading. The first topic has a lot of fields, but other topics
just have a *uuid* field, thus I have to enrich data using the same uuid
for the events coming from the second topic.

Stream1: Messages reading from the first topic. Read state from Cassandra
using the *uuid*. If a state exists, ignore this event and *do not* emit to
the Kafka. If state does not exist, save  this event to the Cassandra, then
emit this event to the Kafka.

Stream2: Messages reading from the second topic. Read state from Cassandra
using the *uuid*. If state exists, check a column that represents this
event came from topic2. If the value of this column is false, enrich the
event using state and update the Cassandra column as true. If true, ignore
this event because this event is a duplicate.

def checkDeDuplication(event): Option[Event] = {
  val state = readFromCassandra(state)
  if (state exist) None //ignore this event
  else {
saveEventToCassandra(event)
Some(event)
  }
}

def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
  val state = readFromCassandra(state)
  if (state does not exist) None //ignore this event
  else {
if (state.flag == true) None // ignore this event
else {
   updateFlagAsTrueInCassandra(event)
   Some(event)
}
  }
}


val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
stream1.union(stream2).addSink(kafkaSink)

1- Is that a good approach?

2- Is Cassandra the right choice here? Note, the state size is very large
and I have to feed the state from batch flow firstly. Thus I can not use
the internal state like rocksdb.

3- Can i improve this logic?

4- May be any bottleneck in that flow? I think to use asyncMap functions
for state read/write operations.


退订

2021-04-23 Thread shao.hongxiao
退订

Re: InitializeOnMaster和FinalizeOnMaster接口为什么只有OutputFormat能用,InputFormat为什么不能用?

2021-04-23 Thread kanata163
把InputOutputFormatVertex这个类简单改下就可以支持了,但是不理解官方不支持的原因,有人能解答一下吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

InitializeOnMaster和FinalizeOnMaster接口为什么只有OutputFormat能用,InputFormat为什么不能用?

2021-04-23 Thread kanata163
  如题。
 
我理解对于写文件类型OutputFormat,用这两个方法在jobmanager里面在任务开始前删除/创建文件夹,在任务结束后把文件移动到正式目录,使文件可见。
 
但是比如对于RDB类型的InputFormat,如果也能用这两个接口,那么比如说在任务开始前查询当前数据库的记录数,这样对于构建数据分片做多通道,以及增加执行进度指标都很有帮助。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

InitializeOnMaster和FinalizeOnMaster接口为什么只有OutputFormat能用,InputFormat为什么不能用?

2021-04-23 Thread kanata163
  如题。
 
我理解对于写文件类型OutputFormat,用这两个方法在jobmanager里面在任务开始前删除/创建文件夹,在任务结束后把文件移动到正式目录,使文件可见。

但是比如对于RDB类型的InputFormat,如果也能用这两个接口,那么比如说在任务开始前查询当前数据库的记录数,这样对于构建数据分片做多通道,以及增加执行进度指标都很有帮助。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: event-time window cannot become earlier than the current watermark by merging

2021-04-23 Thread Matthias Pohl
After having talked to David about this issue offline, I decided to create
a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the
mailing list, Vishal. Hopefully, the community has the chance to look into
it.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-22425

On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl 
wrote:

> To me, it sounds strange. I would have expected it to work with
> `allowedLateness` and `sideOutput` being defined. I pull in David to have a
> look at it. Maybe, he has some more insights. I haven't worked that much
> with lateness, yet.
>
> Matthias
>
> On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in not pulled from the a simulated sideout ( below )
>>
>> >> Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in *now* pulled from the simulated sideout , essentially the
>> Process Function with a reverse predicate to the Filter Process Function.
>>
>>
>> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> And when I added the filter the Exception was not thrown. So the
>>> sequence of events
>>>
>>> * Increased lateness from 12 ( that was what it was initially running
>>> with )  to 24 hours
>>> * the pipe ran as desired before it blew up with the Exception
>>> * masked the issue by increasing the lateness to 48 hours.
>>> * It blew up again but now after the added lateness, so essentially the
>>> same issue but added lateness let the pipe run for another few hours.
>>> * Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in not pulled from the a simulated sideout ( below )
>>>
>>>
>>> public class LateEventFilter extends ProcessFunction>> VALUE>, KeyedTimedValue> {
>>> private static final long serialVersionUID = 1L;
>>>
>>> long allowedLateness;
>>> public LateEventFilter(long allowedLateness){
>>> this.allowedLateness = allowedLateness;
>>> }
>>> @Override
>>> public void processElement(KeyedTimedValue value, Context
>>> ctx,
>>> Collector> out) throws Exception {
>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>> }
>>> }
>>>
>>>
>>> public class LateEventSideOutput extends ProcessFunction>> , KeyedTimedValue> {
>>> private static final long serialVersionUID = 1L;
>>>
>>> long allowedLateness;
>>> public LateEventSideOutput(long allowedLateness){
>>> this.allowedLateness = allowedLateness;
>>> }
>>> @Override
>>> public void processElement(KeyedTimedValue value, Context
>>> ctx,
>>> Collector> out) throws Exception {
>>> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>> }
>>> }
>>>
>>>
>>>
>>>  I am using RocksDB as a backend if that helps.
>>>
>>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Yes sir. The allowedLateNess and side output always existed.

 On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
 wrote:

> You're saying that you used `allowedLateness`/`sideOutputLateData` as
> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
> being added to your pipeline when running into the
> UnsupportedOperationException issue previously?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> As in this is essentially doing what lateness *should* have done
>> And I think that is a bug. My code now is . Please look at
>> the allowedLateness on the session window.
>>
>> SingleOutputStreamOperator>
>> filteredKeyedValue = keyedValue
>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>> "late_filter").uid("late_filter");
>> SingleOutputStreamOperator>
>> lateKeyedValue = keyedValue
>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
>> name("late_data").uid("late_data");
>> SingleOutputStreamOperator>
>> aggregate = filteredKeyedValue
>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>> keyBy(value -> value.getKey())
>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>> lateOutputTag)
>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>> .aggregate(new SortAggregate(),
>> new SessionIdProcessWindowFunction(this.gapInMinutes,
>> 

Re: Re: Official flink java client

2021-04-23 Thread Flavio Pompermaier
Obviously I could rewrite a java client from scratch that interface with
the provided REST API but why if I can reuse something already existing?
Usually I interface with REST API using auto generated clients (if APIs are
exposed via Swagger or OpenApi).
If that's not an option, writing a REST client from scratch is something I
try to avoid as much as I can..

Best,
Flavio

On Fri, Apr 23, 2021 at 9:55 AM Yun Gao  wrote:

> Hi Falvio,
>
> Very thanks for the explanation, may be another option is to have a look
> at
> the http rest API[1] ? Flink provides official http api to submit jar jobs
> and query
> job status, and they might be able to help.
>
> Best,
> Yun
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
>
> --Original Mail --
> *Sender:*Flavio Pompermaier 
> *Send Date:*Fri Apr 23 15:25:55 2021
> *Recipients:*Yun Gao 
> *CC:*gaurav kulkarni , User <
> user@flink.apache.org>
> *Subject:*Re: Official flink java client
>
>> I also interface to Flink clusters using REST in order to avoid many
>> annoying problems (due to dependency conflicts, classpath or env variables).
>> I use an extended version of the RestClusterClient that you can reuse if
>> you want to.
>> It is available at [1] and it add some missing methods to the default
>> Flink version (I also had to copy that class and modify the visibility of
>> some field in order to enable the extension).
>> Officially the Flink RestClusterClient is meant to be used for internal
>> use only but it actually work very well.
>>
>> Best,
>> Flavio
>>
>> [1]
>> https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
>>
>> On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:
>>
>>> Hi gaurav,
>>>
>>> Logicall Flink client is bear inside the StreamExecutionEnvironment, and
>>> users could use the
>>> StreamExecutionEnvironment to execute their jobs. Could you share more
>>> about why you
>>> want to directly use the client?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> --Original Mail --
>>> *Sender:*gaurav kulkarni 
>>> *Send Date:*Fri Apr 23 10:14:08 2021
>>> *Recipients:*User 
>>> *Subject:*Official flink java client
>>>
 Hi,

 Is there any official flink client in java that's available? I came
 across RestClusterClient
 ,
 but I am not sure if its official
 .
 I can create my own client, but just wanted to check if there is anything
 official available already that I can leverage.

 Thanks,
 Gaurav

 Run already deployed job on Flink Cluster using RestClusterClient

 I am trying to run already deployed job on Flink Cluster using Rest
 request.I had success using a simple rest ...

 





Re: Re: Official flink java client

2021-04-23 Thread Yun Gao
Hi Falvio,

Very thanks for the explanation, may be another option is to have a look at 
the http rest API[1] ? Flink provides official http api to submit jar jobs and 
query 
job status, and they might be able to help.

Best,
Yun

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html
 --Original Mail --
Sender:Flavio Pompermaier 
Send Date:Fri Apr 23 15:25:55 2021
Recipients:Yun Gao 
CC:gaurav kulkarni , User 
Subject:Re: Official flink java client

I also interface to Flink clusters using REST in order to avoid many annoying 
problems (due to dependency conflicts, classpath or env variables).
I use an extended version of the RestClusterClient that you can reuse if you 
want to.
It is available at [1] and it add some missing methods to the default Flink 
version (I also had to copy that class and modify the visibility of some field 
in order to enable the extension).
Officially the Flink RestClusterClient is meant to be used for internal use 
only but it actually work very well. 

Best,
Flavio

[1] 
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

Hi gaurav,

Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the 
StreamExecutionEnvironment to execute their jobs. Could you share more about 
why you 
want to directly use the client? 

Best,
Yun



 --Original Mail --
Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021
Recipients:User 
Subject:Official flink java client

Hi, 

Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 

Thanks,
Gaurav












Run already deployed job on Flink Cluster using 
RestClusterClient
I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...




Re: Debezium CDC | OOM

2021-04-23 Thread Matthias Pohl
Got it. Thanks for clarifying.

On Fri, Apr 23, 2021 at 6:36 AM Ayush Chauhan 
wrote:

> Hi Matthias,
>
> I am using RocksDB as a state backend. I think the iceberg sink is not
> able to propagate back pressure to the source which is resulting in OOM for
> my CDC pipeline.
> Please refer to this - https://github.com/apache/iceberg/issues/2504
>
>
>
> On Thu, Apr 22, 2021 at 8:44 PM Matthias Pohl 
> wrote:
>
>> Hi Ayush,
>> Which state backend have you configured [1]? Have you considered trying
>> out RocksDB [2]? RocksDB might help with persisting at least keyed state.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#choose-the-right-state-backend
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>>
>> On Thu, Apr 22, 2021 at 7:52 AM Ayush Chauhan 
>> wrote:
>>
>>> Hi,
>>> I am using flink cdc to stream CDC changes in an iceberg table. When I
>>> first run the flink job for a topic which has all the data for a table, it
>>> get out of heap memory as flink try to load all the data during my 15mins
>>> checkpointing interval. Right now, only solution I have is to pass *-ytm
>>> 8192 -yjm 2048m* for a table with 10M rows and then reduce it after
>>> flink has consumed all the data. Is there a way to tell flink cdc code to
>>> trigger checkpoint or throttle the consumption speed(I think backpressure
>>> should have handled this)?
>>>
>>> --
>>>  Ayush Chauhan
>>>  Software Engineer | Data Platform
>>>  [image: mobile-icon]  +91 9990747111
>>>
>>>
>>> This email is intended only for the person or the entity to whom it is
>>> addressed. If you are not the intended recipient, please delete this email
>>> and contact the sender.
>>>
>>
>
> --
>  Ayush Chauhan
>  Software Engineer | Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>


Re: MemoryStateBackend Issue

2021-04-23 Thread Matthias Pohl
One additional question: How did you stop and restart the job? The behavior
you're expecting should work with stop-with-savepoint. Cancelling the job
and then just restarting it wouldn't work. The latter approach would lead
to a new job being created.

Best,
Matthias

On Thu, Apr 22, 2021 at 3:12 PM Matthias Pohl 
wrote:

> Hi Milind,
> I bet someone else might have a faster answer. But could you provide the
> logs and config to get a better understanding of what your issue is?
> In general, the state is maintained even in cases where a TaskManager
> fails.
>
> Best,
> Matthias
>
> On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya  wrote:
>
>> Hi
>>
>> I see MemoryStateBackend being used in TM Log
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend
>> has been configured, using default (Memory / JobManager)
>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>> maxStateSize: 5242880)
>>
>>
>>
>> I am logging checkpointed value which is just message count
>>
>> Snapshot the state 500
>> Snapshot the state 1000
>>
>>
>> When I restart the job i.e. new TM but the job manager is same I see
>>
>> Snapshot the state 500
>>
>> In the JM logs I see following entries
>>
>> Triggering checkpoint 1
>> Triggering checkpoint 2
>>
>> After restarting job hence new TM
>>
>> Triggering checkpoint 1
>>
>> As per my understanding JM should hold the checkpointed
>> 
>> state across TM ? Am I correct?
>>
>> I have not configured anything special and using default. Do I need to
>> add any setting to make it work ?
>> I want to maintain message count across the TMs.
>>
>

-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica 

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: Official flink java client

2021-04-23 Thread Flavio Pompermaier
I also interface to Flink clusters using REST in order to avoid many
annoying problems (due to dependency conflicts, classpath or env variables).
I use an extended version of the RestClusterClient that you can reuse if
you want to.
It is available at [1] and it add some missing methods to the default Flink
version (I also had to copy that class and modify the visibility of some
field in order to enable the extension).
Officially the Flink RestClusterClient is meant to be used for internal use
only but it actually work very well.

Best,
Flavio

[1]
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java

On Fri, Apr 23, 2021 at 5:10 AM Yun Gao  wrote:

> Hi gaurav,
>
> Logicall Flink client is bear inside the StreamExecutionEnvironment, and
> users could use the
> StreamExecutionEnvironment to execute their jobs. Could you share more
> about why you
> want to directly use the client?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*gaurav kulkarni 
> *Send Date:*Fri Apr 23 10:14:08 2021
> *Recipients:*User 
> *Subject:*Official flink java client
>
>> Hi,
>>
>> Is there any official flink client in java that's available? I came
>> across RestClusterClient
>> ,
>> but I am not sure if its official
>> .
>> I can create my own client, but just wanted to check if there is anything
>> official available already that I can leverage.
>>
>> Thanks,
>> Gaurav
>>
>> Run already deployed job on Flink Cluster using RestClusterClient
>>
>> I am trying to run already deployed job on Flink Cluster using Rest
>> request.I had success using a simple rest ...
>>
>> 
>>
>>
>>


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-23 Thread Matthias Pohl
Another few questions: Have you had the chance to monitor/profile the
memory usage? What section of the memory was used excessively?
Additionally, could @dhanesh arole 's proposal
solve your issue?

Matthias

On Fri, Apr 23, 2021 at 8:41 AM Matthias Pohl 
wrote:

> Thanks for sharing these details. Looking into FLINK-14952 [1] (which
> introduced this option) and the related mailing list thread [2], it feels
> like your issue is quite similar to what is described in there even though
> it sounds like this issue is mostly tied to bounded jobs. But I'm not sure
> what is happening under the hood. I guess you tried the option already?
> Have you had the chance to profile memory. I'm pulling in Piotr and
> Zhijiang. Maybe, they have more insights on that matter.
>
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-14952
> [2]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoGroup-SortMerger-performance-degradation-from-1-6-4-1-9-1-tp31082p31389.html
>
> On Fri, Apr 23, 2021 at 4:53 AM 马阳阳  wrote:
>
>> Hi Matthias,
>> We have “solved” the problem by tuning the join. But I still try to
>> answer the questions, hoping this will help.
>>
>> * What is the option you're referring to for the bounded shuffle? That
>> might help to understand what streaming mode solution you're looking for.
>>
>> taskmanager.network.blocking-shuffle.type "file" String The blocking
>> shuffle type, either "mmap" or "file". The "auto" means selecting the
>> property type automatically based on system memory architecture (64 bit for
>> mmap and 32 bit for file). Note that the memory usage of mmap is not
>> accounted by configured memory limits, but some resource frameworks like
>> yarn would track this memory usage and kill the container once memory
>> exceeding some threshold. Also note that this option is experimental and
>> might be changed future.
>> * What does the job graph look like? Are you assuming that it's due to a
>> shuffling operation? Could you provide the logs to get a better
>> understanding of your case?
>>The graph is join of three streams. And we use rocksdb as the
>> statebackend. I think the crash is due to rocksdb. And I could not get the
>> logs (because some misconfiguration, which caused the logs are empty).
>> * Do you observe the same memory increase for other TaskManager nodes?
>>After one tm is killed, the job failed. So I didn’t see the exactly
>> same memory increase for other tms. But I think other tms would have
>> similiar behavior because the data sizes they processed are almost the same.
>> * Are you expecting to reach the memory limits considering that you
>> mentioned a "big state size"? Would increasing the memory limit be an
>> option or do you fear that it's caused by some memory leak?
>>   By change the tm process memory to 18GB instead of 12GB, it didn’t help.
>>
>> By the answers I provided, I think maybe we should figure out why rocksdb
>> overused virtual memory, and caused yarn to kill the container.
>>
>> On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:
>>
>>> The Flink version we used is 1.12.0.
>>>
>>> 马阳阳
>>> ma_yang_y...@163.com
>>>
>>> 
>>> 签名由 网易邮箱大师  定制
>>>
>>> On 04/16/2021 16:07,马阳阳 
>>> wrote:
>>>
>>> Hi, community,
>>> When running a Flink streaming job with big state size, one task manager
>>> process was killed by the yarn node manager. The following log is from the
>>> yarn node manager:
>>>
>>> 2021-04-16 11:51:23,013 WARN
>>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>>> Container
>>> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
>>> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
>>> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
>>> Killing container.
>>>
>>> When searching solution for this problem, I found that there is a option
>>> for this that worked for bounded shuffle. So is there a way to get rid of
>>> this in streaming mode?
>>>
>>> PS:
>>> memory related options:
>>> taskmanager.memory.process.size:12288m
>>> taskmanager.memory.managed.fraction:0.7
>>>
>>>


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-23 Thread Matthias Pohl
Thanks for sharing these details. Looking into FLINK-14952 [1] (which
introduced this option) and the related mailing list thread [2], it feels
like your issue is quite similar to what is described in there even though
it sounds like this issue is mostly tied to bounded jobs. But I'm not sure
what is happening under the hood. I guess you tried the option already?
Have you had the chance to profile memory. I'm pulling in Piotr and
Zhijiang. Maybe, they have more insights on that matter.

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-14952
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoGroup-SortMerger-performance-degradation-from-1-6-4-1-9-1-tp31082p31389.html

On Fri, Apr 23, 2021 at 4:53 AM 马阳阳  wrote:

> Hi Matthias,
> We have “solved” the problem by tuning the join. But I still try to answer
> the questions, hoping this will help.
>
> * What is the option you're referring to for the bounded shuffle? That
> might help to understand what streaming mode solution you're looking for.
>
> taskmanager.network.blocking-shuffle.type "file" String The blocking
> shuffle type, either "mmap" or "file". The "auto" means selecting the
> property type automatically based on system memory architecture (64 bit for
> mmap and 32 bit for file). Note that the memory usage of mmap is not
> accounted by configured memory limits, but some resource frameworks like
> yarn would track this memory usage and kill the container once memory
> exceeding some threshold. Also note that this option is experimental and
> might be changed future.
> * What does the job graph look like? Are you assuming that it's due to a
> shuffling operation? Could you provide the logs to get a better
> understanding of your case?
>The graph is join of three streams. And we use rocksdb as the
> statebackend. I think the crash is due to rocksdb. And I could not get the
> logs (because some misconfiguration, which caused the logs are empty).
> * Do you observe the same memory increase for other TaskManager nodes?
>After one tm is killed, the job failed. So I didn’t see the exactly
> same memory increase for other tms. But I think other tms would have
> similiar behavior because the data sizes they processed are almost the same.
> * Are you expecting to reach the memory limits considering that you
> mentioned a "big state size"? Would increasing the memory limit be an
> option or do you fear that it's caused by some memory leak?
>   By change the tm process memory to 18GB instead of 12GB, it didn’t help.
>
> By the answers I provided, I think maybe we should figure out why rocksdb
> overused virtual memory, and caused yarn to kill the container.
>
> On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:
>
>> The Flink version we used is 1.12.0.
>>
>> 马阳阳
>> ma_yang_y...@163.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 04/16/2021 16:07,马阳阳 
>> wrote:
>>
>> Hi, community,
>> When running a Flink streaming job with big state size, one task manager
>> process was killed by the yarn node manager. The following log is from the
>> yarn node manager:
>>
>> 2021-04-16 11:51:23,013 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container
>> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
>> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
>> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
>> Killing container.
>>
>> When searching solution for this problem, I found that there is a option
>> for this that worked for bounded shuffle. So is there a way to get rid of
>> this in streaming mode?
>>
>> PS:
>> memory related options:
>> taskmanager.memory.process.size:12288m
>> taskmanager.memory.managed.fraction:0.7
>>
>>


?????? ????upsert-kafka connector??????

2021-04-23 Thread op
??upsert-kafkasinkkeypartition??keyA??B??kafka,
??upsert-kafka??key??A??B??A




----
??: 
   "user-zh"



Re: event-time window cannot become earlier than the current watermark by merging

2021-04-23 Thread Matthias Pohl
To me, it sounds strange. I would have expected it to work with
`allowedLateness` and `sideOutput` being defined. I pull in David to have a
look at it. Maybe, he has some more insights. I haven't worked that much
with lateness, yet.

Matthias

On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi 
wrote:

>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
> metrics show that no data is being pushed through the sideoutput and that
> data in not pulled from the a simulated sideout ( below )
>
> >> Added the Fliter upfront  as below, the pipe has no issues. Also
> metrics show that no data is being pushed through the sideoutput and that
> data in *now* pulled from the simulated sideout , essentially the Process
> Function with a reverse predicate to the Filter Process Function.
>
>
> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi 
> wrote:
>
>> And when I added the filter the Exception was not thrown. So the sequence
>> of events
>>
>> * Increased lateness from 12 ( that was what it was initially running
>> with )  to 24 hours
>> * the pipe ran as desired before it blew up with the Exception
>> * masked the issue by increasing the lateness to 48 hours.
>> * It blew up again but now after the added lateness, so essentially the
>> same issue but added lateness let the pipe run for another few hours.
>> * Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in not pulled from the a simulated sideout ( below )
>>
>>
>> public class LateEventFilter extends ProcessFunction> VALUE>, KeyedTimedValue> {
>> private static final long serialVersionUID = 1L;
>>
>> long allowedLateness;
>> public LateEventFilter(long allowedLateness){
>> this.allowedLateness = allowedLateness;
>> }
>> @Override
>> public void processElement(KeyedTimedValue value, Context ctx
>> ,
>> Collector> out) throws Exception {
>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>> public class LateEventSideOutput extends ProcessFunction> KEY, VALUE>, KeyedTimedValue> {
>> private static final long serialVersionUID = 1L;
>>
>> long allowedLateness;
>> public LateEventSideOutput(long allowedLateness){
>> this.allowedLateness = allowedLateness;
>> }
>> @Override
>> public void processElement(KeyedTimedValue value, Context ctx
>> ,
>> Collector> out) throws Exception {
>> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>>
>>  I am using RocksDB as a backend if that helps.
>>
>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Yes sir. The allowedLateNess and side output always existed.
>>>
>>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
>>> wrote:
>>>
 You're saying that you used `allowedLateness`/`sideOutputLateData` as
 described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
 being added to your pipeline when running into the
 UnsupportedOperationException issue previously?

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

 On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> As in this is essentially doing what lateness *should* have done  And
> I think that is a bug. My code now is . Please look at the allowedLateness
> on the session window.
>
> SingleOutputStreamOperator>
> filteredKeyedValue = keyedValue
> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
> "late_filter").uid("late_filter");
> SingleOutputStreamOperator> lateKeyedValue
> = keyedValue
> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
> name("late_data").uid("late_data");
> SingleOutputStreamOperator>
> aggregate = filteredKeyedValue
> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
> keyBy(value -> value.getKey())
> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
> lateOutputTag)
> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
> .aggregate(new SortAggregate(),
> new SessionIdProcessWindowFunction(this.gapInMinutes, this
> .lateNessInMinutes))
> .name("session_aggregate").uid("session_aggregate");
>
> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> I can do that, but I am not certain this is the right filter.  Can
>> you please validate. That aside I already have the lateness configured 
>> for
>> the session window ( the normal withLateNess() )  and this looks like a
>> session window was not collected and still is alive for some reason ( a
>> flink 

Re: Question about snapshot file

2021-04-23 Thread Matthias Pohl
What is it you're trying to achieve in general? The JavaDoc of
MetadataV2V3SerializerBase provides a description on the format of the
file. Theoretically, you could come up with custom code using the Flink
sources to parse the content of the file. But maybe, there's another way to
accomplish what you're trying to do.

Matthias

[1]
https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83

On Thu, Apr 22, 2021 at 7:53 PM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> I have a savepoint or checkpointed file from my task. However, the file is
> binary. I want to see what the file contains.
>
> How is it possible to see what information the file has (or how it is
> possible to make it human readable?)
>
> Thank you
>
> On Thu, Apr 22, 2021 at 10:19 AM Matthias Pohl 
> wrote:
>
>> Hi Abdullah,
>> the metadata file contains handles to the operator states of the
>> checkpoint [1]. You might want to have a look into the State Processor API
>> [2].
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
>> abdullahbinoma...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> (1) what 's the snapshot metadata file (binary) contains ? is it
>>> possible to read the snapshot metadata file by using Flink Deserialization?
>>>
>>> (2) is there any function that can be used to see the previous states on
>>> time of operation?
>>>
>>> Thank you
>>>
>>