flink写hbase

2021-07-23 Thread 田磊
大家好:
我flink map算子并行度36处理完数据后用sink用2个并行度写hbase的话,处理完几万条数据后会报table 
对象空指针的异常。按理来说之前已经处理了几万条数据,操作的都是hbase的同一张表,不可能table对象获取空指针啊。如果将sink的并行度调整为1的话,不会再有空指针的问题,但是数据写到不到10万条的时候sink的处理速度很慢,出现了严重的反压,导致上游处理数据的速度为0,任务失败。最后将并行度改为map算子的并行度改为24,sink的并行度改为2,虽然说也有反压。但是目前程序没有出现任务失败的情况。有大佬给解释一下什么原因么,有点想不通。
java.lang.NullPointerException
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:129)
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1031)
at 
com.chinaunicom.audit.photo.history.handle.customsink.HbaseNumberAndQualitySink.invoke(HbaseNumberAndQualitySink.java:312)
at 
com.chinaunicom.audit.photo.history.handle.customsink.HbaseNumberAndQualitySink.invoke(HbaseNumberAndQualitySink.java:48)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:745)



FlinkKinesis consumer

2021-07-23 Thread Vijayendra Yadav
Hi Team,

https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.html

*" There is no perfect generic default assignment function. Default shard
to subtask assignment, which is based on hash code, may result in skew,
with some subtasks having many shards assigned and others none."*


*Question*: How to reduce this SKEW ? Some of the subtasks are idle and
others get double the load. I am trying to achieve 1 shard to 1 subtask
match.

Thanks,
Vijay


Queryable State Lookup Failure

2021-07-23 Thread Sandeep khanzode
Hello,

With the default memory settings, after about 5000 records in my 
KafkaFlinkConsumer, and some operators in my pipeline, I get the below error:

Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
at java.nio.DirectByteBuffer.(Unknown Source) ~[?:?]
at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:755)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:731)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:247)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.NettyBufferPool.ioBuffer(NettyBufferPool.java:95)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.messages.MessageSerializer.writePayload(MessageSerializer.java:203)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.messages.MessageSerializer.serializeRequest(MessageSerializer.java:96)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.Client$EstablishedConnection.sendRequest(Client.java:546)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.Client.sendRequest(Client.java:159) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:336)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:295)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:241)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]


I read about this and tried to increase the memory settings as below, which 
took care of that problem … 

jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2300m
taskmanager.memory.network.max: 768m
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.managed.fraction: 0.45
taskmanager.memory.network.min: 192m
taskmanager.memory.task.off-heap.size: 512m


But now I have the below issue at exactly or approximately at the same time 
i.e. about after 5000 records. It doesn’t matter whether I send them in a burst 
or stagger them, strangely after that limit, it always blows up i.e. approx 
near to 4.5 to 5.5 records.

Now I am doing multiple state lookups for the Queryable State. Previously I 
used to do about 50% compared to what I did not and I could ingest millions of 
records. But simply doubling the number of lookups has caused the Queryable 
State to fail.

What memory settings do I have to change to rectify this? Any help will be 
appreciated.

I have also seen the BufferPool error sometimes … 


java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed 
request 67.
 Caused by: org.apache.flink.runtime.query.UnknownKvStateLocation: No 
KvStateLocation found for KvState instance with name ‘queryable-data'.
at 
org.apache.flink.runtime.scheduler.SchedulerBase.requestKvStateLocation(SchedulerBase.java:839)
at 
org.apache.flink.runtime.jobmaster.JobMaster.requestKvStateLocation(JobMaster.java:554)
at jdk.internal.reflect.GeneratedMethodAccessor195.invoke(Unknown 
Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at 

Re: Recover from savepoints with Kubernetes HA

2021-07-23 Thread Austin Cawley-Edwards
Great, glad it was an easy fix :) Thanks for following up!

On Fri, Jul 23, 2021 at 3:54 AM Thms Hmm  wrote:

> Finally I found the mistake. I put the „—host 10.1.2.3“ param as one
> argument. I think the savepoint argument was not interpreted correctly or
> ignored. Might be that the „-s“ param was used as value for „—host
> 10.1.2.3“ and „s3p://…“ as new param and because these are not valid
> arguments they were ignored.
>
> Not working:
>
> 23.07.2021 09:19:54.546 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Program Arguments:
>
> ...
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --host 10.1.2.3
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -s
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> s3p://bucket/job1/savepoints/savepoint-00-1234
>
> -
>
> Working:
>
> 23.07.2021 09:19:54.546 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Program Arguments:
>
> ...
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --host
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 10.1.2.3
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -s
>
> 23.07.2021 09:19:54.549 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> s3p://bucket/job1/savepoints/savepoint-00-1234
>
> ...
>
> 23.07.2021 09:37:12.932 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job
>  from savepoint
> s3p://bucket/job1/savepoints/savepoint-00-1234 ()
>
> Thanks again for your help.
>
> Kr Thomas
>
> Yang Wang  schrieb am Fr. 23. Juli 2021 um 04:34:
>
>> Please note that when the job is canceled, the HA data(including the
>> checkpoint pointers) stored in the ConfigMap/ZNode will be deleted.
>>
>> But it is strange that the "-s/--fromSavepoint" does not take effect when
>> redeploying the Flink application. The JobManager logs could help a lot to
>> find the root cause.
>>
>> Best,
>> Yang
>>
>> Austin Cawley-Edwards  于2021年7月22日周四 下午11:09写道:
>>
>>> Hey Thomas,
>>>
>>> Hmm, I see no reason why you should not be able to update the checkpoint
>>> interval at runtime, and don't believe that information is stored in a
>>> savepoint. Can you share the JobManager logs of the job where this is
>>> ignored?
>>>
>>> Thanks,
>>> Austin
>>>
>>> On Wed, Jul 21, 2021 at 11:47 AM Thms Hmm  wrote:
>>>
 Hey Austin,

 Thanks for your help.

 I tried to change the checkpoint interval as example. The value for it
 comes from an additional config file and is read and set within main() of
 the job.

 The job is running in Application mode. Basically the same
 configuration as from the official Flink website but instead of running the
 JobManager as job it is created as deployment.

 For the redeployment of the job the REST API is triggered to create a
 savepoint and cancel the job. After completion the deployment is updated
 and the pods are recreated. The -s  Is always added as a
 parameter to start the JobManager (standalone-job.sh). CLI is not involved.
 We have automated these steps. But I tried the steps manually and have the
 same results.

 I also tried to trigger a savepoint, scale the pods down, update the
 start parameter with the recent savepoint and renamed
 ‚kubernetes.cluster-id‘ as well as ‚high-availability.storageDir‘.

 When I trigger a savepoint with cancel, I also see that the HA config
 maps are cleaned up.


 Kr Thomas

 Austin Cawley-Edwards  schrieb am Mi. 21.
 Juli 2021 um 16:52:

> Hi Thomas,
>
> I've got a few questions that will hopefully help get to find an
> answer:
>
> What job properties are you trying to change? Something like
> parallelism?
>
> What mode is your job running in? i.e., Session, Per-Job, or
> Application?
>
> Can you also describe how you're redeploying the job? Are you using
> the Native Kubernetes integration or Standalone (i.e. writing k8s  
> manifest
> files yourself)? It sounds like you are using the Flink CLI as well, is
> that correct?
>
> Thanks,
> Austin
>
> On Wed, Jul 21, 2021 at 4:05 AM Thms Hmm  wrote:
>
>> Hey,
>>
>> we have some application clusters running on Kubernetes and explore
>> the HA mode which is working as expected. When we try to upgrade a job,
>> e.g. trigger a savepoint, cancel the job and redeploy, Flink is not
>> restarting from the savepoint we provide using the -s parameter. So all
>> state is lost.
>>
>> If we just trigger the savepoint without canceling the job and
>> redeploy the HA mode picks up from the latest savepoint.
>>
>> But 

ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord

2021-07-23 Thread igyu
in my app pom.xml


org.apache.flink
flink-connector-kafka_2.11
1.13.1
provided


and I have copy flink-connector-kafka_2.11-1.13.1.jar to FLINK_HOME/lib/

but I also get a error

ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord

so
 How can fix it?


igyu


Re: Questions about keyed streams

2021-07-23 Thread Senhong Liu
Hi Dan,

1) If the key doesn’t change in the downstream operators and you want to avoid 
shuffling, maybe the DataStreamUtils#reinterpretAsKeyedStream would be helpful.

2) I am not sure that if you are saying that the data are already partitioned 
in the Kafka and you want to avoid shuffling in the Flink because of reusing 
keyBy(). One solution is that you can try to partition your data in the Kafka 
as if it was partitioned in the Flink when using keyBy(). After that, feel free 
to use  DataStreamUtils#reinterpretAsKeyedStream!

If your use case is not what I described above, maybe you can provide us more 
information.

Best,
Senhong

Sent with a Spark
On Jul 22, 2021, 7:33 AM +0800, Dan Hill , wrote:
> Hi.
>
> 1) If I use the same key in downstream operators (my key is a user id), will 
> the rows stay on the same TaskManager machine?  I join in more info based on 
> the user id as the key.  I'd like for these to stay on the same machine 
> rather than shuffle a bunch of user-specific info to multiple task manager 
> machines.
>
> 2) What are best practices to reduce the number of shuffles when having 
> multiple kafka topics with similar keys (user id).  E.g. should I make make 
> sure the same key writes to the same partition number and then manually which 
> flink tasks get which kafka partitions?


退订

2021-07-23 Thread 天分
退订

Re:回复:flink sql 依赖隔离

2021-07-23 Thread Michael Ran
建议上传的时候单独放,提交任务的时候  拉下来单独引用
在 2021-07-23 11:01:59,"silence"  写道:
>
>这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载
>udf和sql jar之间、udf和udf之间都可能会有依赖冲突,
>目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突
>--
>发件人:Michael Ran 
>发送时间:2021年7月22日(星期四) 20:07
>收件人:user-zh ; silence 
>主 题:Re:flink sql 依赖隔离
>
>通过任务进行隔离引用呗。你们美团已经是k8s了吧?
>在 2021-07-05 14:06:53,"silence"  写道:
>>请教大家目前flink sql有没有办法做到依赖隔离
>>比如connector,format,udf(这个最重要)等,
>>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>


请问下flink on kubernetes 开发计划

2021-07-23 Thread laohu

大家好!

    请问下flink on kubernetes 开发计划。是否可以参与。




Re: k8s session模式SQLclient怎样连接

2021-07-23 Thread godfrey he
我建了一个jira,建议sql client把作业提交到各种集群的方式在文档里写清楚,
可以关注 https://issues.apache.org/jira/browse/FLINK-23483

Best,
Godfrey


Caizhi Weng  于2021年7月23日周五 上午10:12写道:

> Hi!
>
> 可以考虑把 k8s session 的 flink rest api 地址暴露出来,然后客户端把 execution.target 设为
> remote,rest.address 和 rest.port 设为相应地址。
>
> maker_d...@foxmail.com  于2021年7月22日周四 下午9:46写道:
>
> > 大家好,
> > 我将flink部署在k8s集群上,使用官方文档上的session模式进行部署,可以正常提交任务。
> > 现在我想使用sqlclient,在提交任务时提示 :
> > [ERROR] Could not execute SQL statement. Reason:
> > java.net.UnknownHostException: flink-cluster
> > 请问大家,如何使用sqlclient连接k8s上的flink session。
> > flink版本 1.12.4.
> >
> >
> >
> > maker_d...@foxmail.com
> >
>


RE: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-23 Thread LINZ, Arnaud
Hello,

It’s hard to say what caused the timeout to trigger – I agree with you that it 
should not have stopped the heartbeat thread, but it did. The easy fix was to 
increase it until we no longer see our app self-killed. The task was using a 
CPU-intensive computation (with a few threads created at some points… Somehow 
breaking the “slot number” contract).
For the RAM cache, I believe that the hearbeat timeout may also times out 
because of a busy network.

Cheers,
Arnaud


De : Till Rohrmann 
Envoyé : jeudi 22 juillet 2021 11:33
À : LINZ, Arnaud 
Cc : Gen Luo ; Yang Wang ; dev 
; user 
Objet : Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default 
values

Thanks for your inputs Gen and Arnaud.

I do agree with you, Gen, that we need better guidance for our users on when to 
change the heartbeat configuration. I think this should happen in any case. I 
am, however, not so sure whether we can give hard threshold like 5000 tasks, 
for example, because as Arnaud said it strongly depends on the workload. Maybe 
we can explain it based on symptoms a user might experience and what to do then.

Concerning your workloads, Arnaud, I'd be interested to learn a bit more. The 
user code runs in its own thread. This means that its operation won't block the 
main thread/heartbeat. The only thing that can happen is that the user code 
starves the heartbeat in terms of CPU cycles or causes a lot of GC pauses. If 
you are observing the former problem, then we might think about changing the 
priorities of the respective threads. This should then improve Flink's 
stability for these workloads and a shorter heartbeat timeout should be 
possible.

Also for the RAM-cached repositories, what exactly is causing the heartbeat to 
time out? Is it because you have a lot of GC or that the heartbeat thread does 
not get enough CPU cycles?

Cheers,
Till

On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>> wrote:
Hello,

From a user perspective: we have some (rare) use cases where we use “coarse 
grain” datasets, with big beans and tasks that do lengthy operation (such as ML 
training). In these cases we had to increase the time out to huge values 
(heartbeat.timeout: 50) so that our app is not killed.
I’m aware this is not the way Flink was meant to be used, but it’s a convenient 
way to distribute our workload on datanodes without having to use another 
concurrency framework (such as M/R) that would require the recoding of sources 
and sinks.

In some other (most common) cases, our tasks do some R/W accesses to RAM-cached 
repositories backed by a key-value storage such as Kudu (or Hbase). If most of 
those calls are very fast, sometimes when the system is under heavy load they 
may block more than a few seconds, and having our app killed because of a short 
timeout is not an option.

That’s why I’m not in favor of very short timeouts… Because in my experience it 
really depends on what user code does in the tasks. (I understand that 
normally, as user code is not a JVM-blocking activity such as a GC, it should 
have no impact on heartbeats, but from experience, it really does)

Cheers,
Arnaud


De : Gen Luo mailto:luogen...@gmail.com>>
Envoyé : jeudi 22 juillet 2021 05:46
À : Till Rohrmann mailto:trohrm...@apache.org>>
Cc : Yang Wang mailto:danrtsey...@gmail.com>>; dev 
mailto:d...@flink.apache.org>>; user 
mailto:user@flink.apache.org>>
Objet : Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default 
values

Hi,
Thanks for driving this @Till Rohrmann . I would 
give +1 on reducing the heartbeat timeout and interval, though I'm not sure if 
15s and 3s would be enough either.

IMO, except for the standalone cluster, where the heartbeat mechanism in Flink 
is totally relied, reducing the heartbeat can also help JM to find out faster 
TaskExecutors in abnormal conditions that can not respond to the heartbeat 
requests, e.g., continuously Full GC, though the process of TaskExecutor is 
alive and may not be known by the deployment system. Since there are cases that 
can benefit from this change, I think it could be done if it won't break the 
experience in other scenarios.

If we can address what will block the main threads from processing heartbeats, 
or enlarge the GC costs, we can try to get rid of them to have a more 
predictable response time of heartbeat, or give some advices to users if their 
jobs may encounter these issues. For example, as far as I know JM of a large 
scale job will be more busy and may not able to process heartbeats in time, 
then we can give a advice that users working with job large than 5000 tasks 
should enlarge there heartbeat interval to 10s and timeout to 50s. The numbers 
are written casually.

As for the issue in FLINK-23216, I think it should be fixed and may not be a 
main concern for this case.

On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Thanks for sharing these 

Re: Flink user subscription request

2021-07-23 Thread David Morávek
Hi Samir,

to unsubscribe please send an empty-body / empty-subject email to
user-unsubscr...@flink.apache.org. You can see a community page [1] in docs
for more details.

[1] https://flink.apache.org/community.html

Best,
D.

On Fri, Jul 23, 2021 at 9:01 AM Samir Vasani  wrote:

> Hi,
>
> This is the user subscription request.
>
> Thanks & Regards,
> Samir Vasani
>
>


Re: Move already processed file from one folder to another folder in flink

2021-07-23 Thread Samir Vasani
Hi,

Can you elaborate more on UDF as I did not understand it.

Thanks & Regards,
Samir Vasani



On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng  wrote:

> Hi!
>
> In this case it won't work, as JobListener#onJobExecuted will only be
> called when the job finishes, successfully or unsuccessfully.
>
> For a forever-running job I would suggest adding a UDF right after the
> source and adding a special "EOF" record in each of the csv file. This UDF
> monitors the data flowing through it, and if it gets the EOF record it
> moves the file.
>
> Samir Vasani  于2021年7月23日周五 下午3:44写道:
>
>> Hi Caizhi Weng,
>>
>> Thanks for your input.
>> I would explain the requirement in little more detail.
>> Flink pipeline will be running forever (until some issue happens and we
>> would need to restart) so It will continuously monitor if a new file comes
>> to the *input *folder or not.
>> In this case will your suggestion work?
>>
>>
>> Thanks & Regards,
>> Samir Vasani
>>
>>
>>
>> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> JobListener#onJobExecuted might help, if your job is not a
>>> forever-running streaming job. See
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>>>
>>> Samir Vasani  于2021年7月23日周五 下午3:22写道:
>>>
 Hi,

 I am a new bee to flink and facing some challenges to solve below use
 case

 Use Case description:

 I will receive a csv file with a timestamp on every single day in some
 folder say *input*.The file format would be
 *file_name_dd-mm-yy-hh-mm-ss.csv*.

 Now my flink pipeline will read this csv file in a row by row fashion
 and it will be written to my Kafka topic.

 Once the pipeline reads the entire file then this file needs to be
 moved to another folder say *historic* so that i can keep *input * folder
 empty for the new file.

 I googled a lot but did not find anything so can you guide me to
 achieve this.

 Let me know if anything else is required.


 Samir Vasani

>>>


Re: Recover from savepoints with Kubernetes HA

2021-07-23 Thread Thms Hmm
Finally I found the mistake. I put the „—host 10.1.2.3“ param as one
argument. I think the savepoint argument was not interpreted correctly or
ignored. Might be that the „-s“ param was used as value for „—host
10.1.2.3“ and „s3p://…“ as new param and because these are not valid
arguments they were ignored.

Not working:

23.07.2021 09:19:54.546 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Program Arguments:

...

23.07.2021 09:19:54.549 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --host 10.1.2.3

23.07.2021 09:19:54.549 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -s

23.07.2021 09:19:54.549 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
s3p://bucket/job1/savepoints/savepoint-00-1234

-

Working:

23.07.2021 09:19:54.546 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Program Arguments:

...

23.07.2021 09:19:54.549 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --host

23.07.2021 09:19:54.549 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 10.1.2.3

23.07.2021 09:19:54.549 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -s

23.07.2021 09:19:54.549 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
s3p://bucket/job1/savepoints/savepoint-00-1234

...

23.07.2021 09:37:12.932 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job
 from savepoint
s3p://bucket/job1/savepoints/savepoint-00-1234 ()

Thanks again for your help.

Kr Thomas

Yang Wang  schrieb am Fr. 23. Juli 2021 um 04:34:

> Please note that when the job is canceled, the HA data(including the
> checkpoint pointers) stored in the ConfigMap/ZNode will be deleted.
>
> But it is strange that the "-s/--fromSavepoint" does not take effect when
> redeploying the Flink application. The JobManager logs could help a lot to
> find the root cause.
>
> Best,
> Yang
>
> Austin Cawley-Edwards  于2021年7月22日周四 下午11:09写道:
>
>> Hey Thomas,
>>
>> Hmm, I see no reason why you should not be able to update the checkpoint
>> interval at runtime, and don't believe that information is stored in a
>> savepoint. Can you share the JobManager logs of the job where this is
>> ignored?
>>
>> Thanks,
>> Austin
>>
>> On Wed, Jul 21, 2021 at 11:47 AM Thms Hmm  wrote:
>>
>>> Hey Austin,
>>>
>>> Thanks for your help.
>>>
>>> I tried to change the checkpoint interval as example. The value for it
>>> comes from an additional config file and is read and set within main() of
>>> the job.
>>>
>>> The job is running in Application mode. Basically the same configuration
>>> as from the official Flink website but instead of running the JobManager as
>>> job it is created as deployment.
>>>
>>> For the redeployment of the job the REST API is triggered to create a
>>> savepoint and cancel the job. After completion the deployment is updated
>>> and the pods are recreated. The -s  Is always added as a
>>> parameter to start the JobManager (standalone-job.sh). CLI is not involved.
>>> We have automated these steps. But I tried the steps manually and have the
>>> same results.
>>>
>>> I also tried to trigger a savepoint, scale the pods down, update the
>>> start parameter with the recent savepoint and renamed
>>> ‚kubernetes.cluster-id‘ as well as ‚high-availability.storageDir‘.
>>>
>>> When I trigger a savepoint with cancel, I also see that the HA config
>>> maps are cleaned up.
>>>
>>>
>>> Kr Thomas
>>>
>>> Austin Cawley-Edwards  schrieb am Mi. 21. Juli
>>> 2021 um 16:52:
>>>
 Hi Thomas,

 I've got a few questions that will hopefully help get to find an answer:

 What job properties are you trying to change? Something like
 parallelism?

 What mode is your job running in? i.e., Session, Per-Job, or
 Application?

 Can you also describe how you're redeploying the job? Are you using the
 Native Kubernetes integration or Standalone (i.e. writing k8s  manifest
 files yourself)? It sounds like you are using the Flink CLI as well, is
 that correct?

 Thanks,
 Austin

 On Wed, Jul 21, 2021 at 4:05 AM Thms Hmm  wrote:

> Hey,
>
> we have some application clusters running on Kubernetes and explore
> the HA mode which is working as expected. When we try to upgrade a job,
> e.g. trigger a savepoint, cancel the job and redeploy, Flink is not
> restarting from the savepoint we provide using the -s parameter. So all
> state is lost.
>
> If we just trigger the savepoint without canceling the job and
> redeploy the HA mode picks up from the latest savepoint.
>
> But this way we can not upgrade job properties as they were picked up
> from the savepoint as it seems.
>
> Is there any advice on how to do upgrades with HA enabled?
>
> Flink version is 1.12.2.
>
> Thanks for your help.
>
> Kr thomas
>



Re: Move already processed file from one folder to another folder in flink

2021-07-23 Thread Caizhi Weng
Hi!

In this case it won't work, as JobListener#onJobExecuted will only be
called when the job finishes, successfully or unsuccessfully.

For a forever-running job I would suggest adding a UDF right after the
source and adding a special "EOF" record in each of the csv file. This UDF
monitors the data flowing through it, and if it gets the EOF record it
moves the file.

Samir Vasani  于2021年7月23日周五 下午3:44写道:

> Hi Caizhi Weng,
>
> Thanks for your input.
> I would explain the requirement in little more detail.
> Flink pipeline will be running forever (until some issue happens and we
> would need to restart) so It will continuously monitor if a new file comes
> to the *input *folder or not.
> In this case will your suggestion work?
>
>
> Thanks & Regards,
> Samir Vasani
>
>
>
> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> JobListener#onJobExecuted might help, if your job is not a
>> forever-running streaming job. See
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>>
>> Samir Vasani  于2021年7月23日周五 下午3:22写道:
>>
>>> Hi,
>>>
>>> I am a new bee to flink and facing some challenges to solve below use
>>> case
>>>
>>> Use Case description:
>>>
>>> I will receive a csv file with a timestamp on every single day in some
>>> folder say *input*.The file format would be
>>> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>>>
>>> Now my flink pipeline will read this csv file in a row by row fashion
>>> and it will be written to my Kafka topic.
>>>
>>> Once the pipeline reads the entire file then this file needs to be moved
>>> to another folder say *historic* so that i can keep *input * folder
>>> empty for the new file.
>>>
>>> I googled a lot but did not find anything so can you guide me to achieve
>>> this.
>>>
>>> Let me know if anything else is required.
>>>
>>>
>>> Samir Vasani
>>>
>>


Re: Move already processed file from one folder to another folder in flink

2021-07-23 Thread Samir Vasani
Hi Caizhi Weng,

Thanks for your input.
I would explain the requirement in little more detail.
Flink pipeline will be running forever (until some issue happens and we
would need to restart) so It will continuously monitor if a new file comes
to the *input *folder or not.
In this case will your suggestion work?


Thanks & Regards,
Samir Vasani



On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng  wrote:

> Hi!
>
> JobListener#onJobExecuted might help, if your job is not a forever-running
> streaming job. See
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>
> Samir Vasani  于2021年7月23日周五 下午3:22写道:
>
>> Hi,
>>
>> I am a new bee to flink and facing some challenges to solve below use case
>>
>> Use Case description:
>>
>> I will receive a csv file with a timestamp on every single day in some
>> folder say *input*.The file format would be
>> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>>
>> Now my flink pipeline will read this csv file in a row by row fashion and
>> it will be written to my Kafka topic.
>>
>> Once the pipeline reads the entire file then this file needs to be moved
>> to another folder say *historic* so that i can keep *input * folder
>> empty for the new file.
>>
>> I googled a lot but did not find anything so can you guide me to achieve
>> this.
>>
>> Let me know if anything else is required.
>>
>>
>> Samir Vasani
>>
>


Re: Move already processed file from one folder to another folder in flink

2021-07-23 Thread Caizhi Weng
Hi!

JobListener#onJobExecuted might help, if your job is not a forever-running
streaming job. See
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html

Samir Vasani  于2021年7月23日周五 下午3:22写道:

> Hi,
>
> I am a new bee to flink and facing some challenges to solve below use case
>
> Use Case description:
>
> I will receive a csv file with a timestamp on every single day in some
> folder say *input*.The file format would be
> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>
> Now my flink pipeline will read this csv file in a row by row fashion and
> it will be written to my Kafka topic.
>
> Once the pipeline reads the entire file then this file needs to be moved
> to another folder say *historic* so that i can keep *input * folder empty
> for the new file.
>
> I googled a lot but did not find anything so can you guide me to achieve
> this.
>
> Let me know if anything else is required.
>
>
> Samir Vasani
>


Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-07-23 Thread Flavio Pompermaier
Could this be related to https://issues.apache.org/jira/browse/FLINK-22414?

On Thu, Jul 22, 2021 at 3:53 PM Timo Walther  wrote:

> Thanks, this should definitely work with the pre-packaged connectors of
> Ververica platform.
>
> I guess we have to investigate what is going on. Until then, a
> workaround could be to add Hadoop manually and set the HADOOP_CLASSPATH
> environment variable. The root cause seems that Hadoop cannot be found.
>
> Alternatively, you could also build a custom image and include Hadoop in
> the lib folder of Flink:
>
> https://docs.ververica.com/v1.3/platform/installation/custom_images.html
>
> I hope this helps. I will get back to you if we have a fix ready.
>
> Regards,
> Timo
>
>
>
> On 22.07.21 14:30, Natu Lauchande wrote:
> > Sure.
> >
> > That's how the ddl table looks like:
> >
> > CREATETABLEtablea (
> >
> > `a` BIGINT,
> >
> > `b` BIGINT,
> >
> > `c` BIGINT
> >
> > )
> >
> > COMMENT ''
> >
> > WITH(
> >
> > 'auto-compaction'='false',
> >
> > 'connector'='filesystem',
> >
> > 'format'='parquet',
> >
> > 'parquet.block.size'='134217728',
> >
> > 'parquet.compression'='SNAPPY',
> >
> > 'parquet.dictionary.page.size'='1048576',
> >
> > 'parquet.enable.dictionary'='true',
> >
> > 'parquet.page.size'='1048576',
> >
> > 'parquet.writer.max-padding'='2097152',
> >
> > 'path'='s3a://test/test’,
> >
> > 'sink.partition-commit.delay'='1 h',
> >
> > 'sink.partition-commit.policy.kind'='success-file',
> >
> > 'sink.partition-commit.success-file.name
> > '='_SUCCESS',
> >
> > 'sink.partition-commit.trigger'='process-time',
> >
> > 'sink.rolling-policy.check-interval'='20 min',
> >
> > 'sink.rolling-policy.file-size'='128MB',
> >
> > 'sink.rolling-policy.rollover-interval'='2 h'
> >
> > );
> >
> >
> >
> > When a change the connector to a blackhole it immediately works without
> > errors. I have the redacted the names and paths.
> >
> >
> >
> > Thanks,
> > Natu
> >
> >
> > On Thu, Jul 22, 2021 at 2:24 PM Timo Walther  > > wrote:
> >
> > Maybe you can share also which connector/format you are using? What
> is
> > the DDL?
> >
> > Regards,
> > Timo
> >
> >
> > On 22.07.21 14:11, Natu Lauchande wrote:
> >  > Hey Timo,
> >  >
> >  > Thanks for the reply.
> >  >
> >  > No custom file as we are using Flink SQL and submitting the job
> > directly
> >  > through the SQL Editor UI. We are using Flink 1.13.1 as the
> > supported
> >  > flink version. No custom code all through Flink SQL on UI no jars.
> >  >
> >  > Thanks,
> >  > Natu
> >  >
> >  > On Thu, Jul 22, 2021 at 2:08 PM Timo Walther  > 
> >  > >> wrote:
> >  >
> >  > Hi Natu,
> >  >
> >  > Ververica Platform 2.5 has updated the bundled Hadoop version
> > but this
> >  > should not result in a NoClassDefFoundError exception. How
> > are you
> >  > submitting your SQL jobs? You don't use Ververica's SQL
> > service but
> >  > have
> >  > built a regular JAR file, right? If this is the case, can you
> > share
> >  > your
> >  > pom.xml file with us? The Flink version stays constant at
> 1.12?
> >  >
> >  > Regards,
> >  > Timo
> >  >
> >  > On 22.07.21 12:22, Natu Lauchande wrote:
> >  >  > Good day Flink community,
> >  >  >
> >  >  > Apache Flink/Ververica Community Edition - Question
> >  >  >
> >  >  >
> >  >  > I am having an issue with my Flink SQL jobs since updating
> >  > from Flink
> >  >  > 1.12/Ververica 2.4 to Ververica 2.5 . For all the jobs
> > running on
> >  >  > parquet and S3 i am getting the following error
> continuously:
> >  >  >
> >  >  > INITIALIZING to FAILED on 10.243.3.0:42337-2a3224 @
> >  >  > 10-243-3-0.flink-metrics.vvp-jobs.svc.cluster.local
> > (dataPort=39309).
> >  >  >
> >  >  > java.lang.NoClassDefFoundError:
> > org/apache/hadoop/conf/Configuration
> >  >  >
> >  >  > at java.lang.Class.getDeclaredConstructors0(Native Method)
> >  > ~[?:1.8.0_292]
> >  >  >
> >  >  > at
> > java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
> >  >  > ~[?:1.8.0_292]
> >  >  >
> >  >  > at java.lang.Class.getDeclaredConstructors(Class.java:2020)
> >  > ~[?:1.8.0_292]
> >  >  >
> >  >  > **
> >  >  >
> >  >  > at
> > java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
> >  >  > ~[?:1.8.0_292]
> >  >  >
> >  >  > at
> >  >  >
> >  >
> >
>  
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
> >  >
> >  >  > 

Move already processed file from one folder to another folder in flink

2021-07-23 Thread Samir Vasani
Hi,

I am a new bee to flink and facing some challenges to solve below use case

Use Case description:

I will receive a csv file with a timestamp on every single day in some
folder say *input*.The file format would be
*file_name_dd-mm-yy-hh-mm-ss.csv*.

Now my flink pipeline will read this csv file in a row by row fashion and
it will be written to my Kafka topic.

Once the pipeline reads the entire file then this file needs to be moved to
another folder say *historic* so that i can keep *input * folder empty for
the new file.

I googled a lot but did not find anything so can you guide me to achieve
this.

Let me know if anything else is required.


Samir Vasani


Flink user subscription request

2021-07-23 Thread Samir Vasani
Hi,

This is the user subscription request.

Thanks & Regards,
Samir Vasani


YarnTaskExecutorRunner should contains MapReduce classes

2021-07-23 Thread chenkaibit



Hi:
I followed instructions described in 
[https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive] 
and tested hive streaming sink, met this exception  




  Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.mapred.JobConf




[http://apache-flink.147419.n8.nabble.com/Flink-td7866.html] met the same 
problem.







I checked TM jvm envs and the code and found that flink only set up 
YARN_APPLICATION_CLASSPATH, but without MAPREDUCE_APPLICATION_CLASSPATH.




See: 
[https://github.com/apache/flink/blob/ed39fb2efc790af038c1babd4a48847b7b39f91e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java#L119]

 




I think we should add MAPREDUCE_APPLICATION_CLASSPATH as well, as the same as 
spark does.



I created https://issues.apache.org/jira/browse/FLINK-23449 for this and hope 
someone could review.