退订

2021-04-09 Thread 541122...@qq.com
退订



541122...@qq.com


Flink Metric isBackPressured not available

2021-04-09 Thread Claude M
Hello,

The documentation here
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
states there is a isBackPressured metric available yet I don't see it.  Any
ideas why?


Thanks


Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-09 Thread Vijayendra Yadav
Thank You it helped.


> On Apr 8, 2021, at 10:53 PM, Arvid Heise  wrote:
> 
> 
> Hi Vijay,
> 
> if you don't specify a checkpoint, then Flink assumes you want to start from 
> scratch (e.g., you had a bug in your business logic and need to start 
> completely without state).
> 
> If there is any failure and Flink restarts automatically, it will always pick 
> up from the latest checkpoint [1].
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#recovery
> 
>> On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav  
>> wrote:
>> Thanks it was working fine with: bin/flink run  -s 
>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
>>  \
>> 
>>> On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav  
>>> wrote:
>>> Hi Arvid,
>>> 
>>> Thanks for your response. I did not restart from the checkpoint. I assumed 
>>> Flink would look for a checkpoint upon restart automatically. 
>>> 
>>> I should restart like below ?
>>> 
>>> bin/flink run  -s 
>>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
>>>  \
>>> 
>>> Thanks,
>>> Vijay
>>> 
 On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:
 Hi Vijay,
 
 edit: After re-reading your message: are you sure that you restart from a 
 checkpoint/savepoint? If you just start the application anew and use 
 LATEST initial position, this is the expected bahvior.
 
 --- original intended answer if you restart from checkpoint
 
 this is definitively not the expected behavior.
 
 To exclude certain error sources:
 - Could you double-check if this is also happening if you don't use 
 unaligned checkpoints? (I don't really think this is because of unaligned 
 checkpoint, but it's better to be sure and we want to reduce the possible 
 error sources)
 - Can you see the missing messages still in Kinesis?
 - Could you extract all log INFO statements from 
 org.apache.flink.streaming.connectors.kinesis and attach them here?
 - How long did you wait with recovery?
 
 
 
> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav  
> wrote:
> Hi Team,
> 
> We are trying to make sure we are not losing data when KINESIS Consumer 
> is down.
> 
> Kinesis streaming Job which has following checkpointing properties:
> 
> // checkpoint every X msecs
> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());
> // enable externalized checkpoints which are retained after job 
> cancellation
> env.getCheckpointConfig().enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
> );
> // allow job recovery fallback to checkpoint when there is a more recent 
> savepoint
>
> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>  // enables the experimental unaligned checkpoints
> env.getCheckpointConfig().enableUnalignedCheckpoints();
> //checkpointpath
> env.setStateBackend(new 
> FsStateBackend(Conf.getFlinkCheckPointPath(), true));
> 
> 1) We killed the Kinesis Job
> 2) Sent messages to KDS while Consumer was down.
> 3) Restarted Flink Consumer, messages which were sent during the Consumer 
> down period, never ingested (data loss).
> 4) Re-sent messages to KDS while the consumer was still up. Messages did 
> ingest fine.
> 
> How can I avoid data loss for #3 ??
> 
> From Logs:
> 
> 2021-04-07 12:15:49,161 INFO  
> org.apache.flink.runtime.jobmaster.JobMaster  - Using 
> application-defined state backend: File State Backend (checkpoints: 
> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: 
> TRUE, fileStateThreshold: -1)
> 
> 2021-04-07 12:16:02,343 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 
> ms).
> 2021-04-07 12:16:11,951 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job 
> 8943d16e22b8aaf65d6b9e2b8bd54113.
> 2021-04-07 12:16:12,483 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 
> ms).
> 
> Thanks,
> Vijay


Re: clear() in a ProcessWindowFunction

2021-04-09 Thread Roman Khachatryan
Hi Vishal,

Sorry for the late reply,
Please find my answers below.
By state I assume the state obtained via getRuntimeContext (access to
window state is not allowed)..

> The state is scoped to the key (created per key in the ProcessWindowFunction 
> with a ttl )
Yes.

> The state will remain alive irrespective of whether the Window is closed or 
> not (a TTL timer does the collection )
Right, but you need to configure TTL when accessing the state [1]

>  The execution on a key is sequential , as in if 2 events arrive for the 2 
> Sessions they happen sequentially ( or in any order but without the need of 
> synchronization )
Right.

> The state mutated by an event in Session A, will be visible to Session B if 
> an event incident on Session B was to happen subsequently.  There is no need 
> of synchronizing access to the state as it for the same key.
Right.

Your understanding of merging of window contents is also correct.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman


On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi
 wrote:
>
> I had a query Say I have a single key with 2 live sessions ( A and B )  with 
> a configured lateness .
>
> Do these invariants hold?
>
> * The state is scoped to the key (created per key in the 
> ProcessWindowFunction with a ttl )
> * The state will remain alive irrespective of whether the Window is closed or 
> not (a TTL timer does the collection )
> *  The execution on a key is sequential , as in if 2 events arrive for the 2 
> Sessions they happen sequentially ( or in any order but without the need of 
> synchronization )
> * The state mutated by an event in Session A, will be visible to Session B if 
> an event incident on Session B was to happen subsequently.  There is no need 
> of synchronizing access to the state as it for the same key.
>
> What I am not sure about is what happens when session A merge with session B. 
> I would assume that it just is defining new start and end of the merged 
> window, Gcing the old ones ( or at least one of them ) and assigning that 
> even to that new window. What one does with the custom state in 
> ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is done 
> in the process method above,  As in this state is 1 degree removed from what 
> ever flink does internally with it's merges given that the state is scoped to 
> the key.
>
>
>
>
>
>
>
> On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi  
> wrote:
>>
>> Yep, makes sense.
>>
>> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan  wrote:
>>>
>>> > Want to confirm that the keys are GCed ( along with state ) once the  
>>> > (windows close + lateness ) ?
>>> Window state is cleared (as well as the window itself), but global
>>> state is not (unless you use TTL).
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>>>  wrote:
>>> >
>>> > Sometimes writing it down makes you think. I now realize that this is not 
>>> > the right approach, given that merging windows will have their own 
>>> > states..and how the merge happens is really at the key level
>>> >
>>> >
>>> >
>>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi 
>>> >  wrote:
>>> >>
>>> >> I intend to augment every event in a session  with a unique ID.  To keep 
>>> >> the session lean, there is a PurgingTrigger on this aggregate that  
>>> >> fires on a count of 1.
>>> >>
>>> >> >> (except that the number of keys can grow).
>>> >>
>>> >> Want to confirm that the keys are GCed ( along with state ) once the  
>>> >> (windows close + lateness ) ?
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan  
>>> >> wrote:
>>> >>>
>>> >>> Hi Vishal,
>>> >>>
>>> >>> There is no leak in the code you provided (except that the number of
>>> >>> keys can grow).
>>> >>> But as you figured out the state is scoped to key, not to window+key.
>>> >>>
>>> >>> Could you explain what you are trying to achieve and why do you need to 
>>> >>> combine
>>> >>> sliding windows with state scoped to window+key?
>>> >>>
>>> >>> Regards,
>>> >>> Roman
>>> >>>
>>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>> >>>  wrote:
>>> >>> >
>>> >>> > Essentially, Does this code leak state
>>> >>> >
>>> >>> > private static class SessionIdProcessWindowFunction>> >>> > java.io.Serializable, VALUE extends java.io.Serializable>
>>> >>> > extends
>>> >>> > ProcessWindowFunction, 
>>> >>> > KeyedSessionWithSessionID, KEY, TimeWindow> {
>>> >>> > private static final long serialVersionUID = 1L;
>>> >>> > private final static ValueStateDescriptor sessionId = new 
>>> >>> > ValueStateDescriptor("session_uid",
>>> >>> > String.class);
>>> >>> >
>>> >>> > @Override
>>> >>> > public void process(KEY key,
>>> >>> > ProcessWindowFunction, 
>>> >>> > KeyedSessionWithSessionID, KEY, 

Re: Task manager local state data after crash / recovery

2021-04-09 Thread dhanesh arole
Thanks a lot for answering it in detail. This makes sense and cleared lots
of doubt.

On Fri, 9 Apr 2021 at 13:02 Till Rohrmann  wrote:

> Hi Dhanesh,
>
> The way local state works in Flink currently is the following: The user
> configures a `taskmanager.state.local.root-dirs` or the tmp directory is
> used where Flink creates a "localState" directory. This is the base
> directory for all local state. Within this directory a TaskManager creates
> for every allocation a sub directory using the `AllocationID`. Inside this
> directory, Flink then stores the local state artefacts.
>
> When Flink frees an allocation, then the corresponding directory is
> deleted. In case that the process is being killed via a SIGTERM signal,
> Flink also registers a shut down hook which tries to delete all directories
> for the known `AllocationIDs`. If the shut down hooks do not run (e.g.
> killed via SIGKILL), then Flink leaves some residual state.
>
> Now the problem is what happens if the TaskManager process is restarted on
> the same machine. In this case, Flink will simply use the same local state
> directory but it ignores existing allocation id sub directories. The reason
> is that Flink does not know whether these allocation id sub directories are
> not used by another Flink process running on the same machine. In order to
> make this decision Flink would have to know that it is the owner of these
> sub directories. This could work if each TaskManager process is started
> with a unique ID and if this ID is reused across restart attempts. This is
> currently not for every deployment the case.
>
> Long story short, it is currently expected that Flink can leave some
> residual state in case of a hard process stop. Cleaning this state up is at
> the moment unfortunately the responsibility of the user.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 4:55 PM dhanesh arole 
> wrote:
>
>> Hey all,
>>
>> We are running a stateful stream processing job on k8s using per-job
>> standalone deployment entrypoint. Flink version: 1.12.1
>>
>> *Problem*: We have observed that whenever a task manager is either
>> gracefully shut down or killed ( due to OOM, k8s worker node drain out etc
>> ) it doesn't clean up the rocksdb state directories from the local disk.
>> But when the task manager restarts and it receives new task allocation from
>> the resource manager it rebuilds its local state for those tasks from the
>> previous completed checkpoint. Over the period of time after multiple
>> restarts, the task manager's local disk ends up accumulating lots of such
>> orphan rocksdb directories.
>>
>> *Questions*: This isn't causing any functional issues to us, but it adds
>> up lots of repeated ops overhead of cleaning these disks periodically. As a
>> workaround, we are thinking of cleaning the local rocksdb directories
>> except for the *taskmanager.state.local.root-dirs *before starting the
>> task manager java process. Since, during every task manager restart keyed
>> state backends for allocated tasks are anyway restored we feel it is the
>> safest option atm and will solve our problem of ever growing disk on task
>> manager pods. Is it safe to do so or are there any other consequences of
>> it? Is there any config or restart policy that takes care of cleaning up
>> such stale rocksdb directories during the statebackend restore process?.
>>
>> A sort of similar clean up is required when local task recovery is
>> enabled. Whenever the task manager is not shut down gracefully the old
>> localState doesn't get cleaned up on the next restart. This also causes
>> lots of disk space wastage. It's easier to delete rocksdb working
>> directories from previou run, but not so straightforward for the localState
>> as one has to figure out which one of them are actually stale allocation
>> IDs and clean only those one. Or check the latest completed checkpoint and
>> delete all localStates directories for older checkpoints and
>> allocation-ids. Is there any other solution to this problem? Also would
>> like to learn from other users how are you handling these operational tasks
>> currently?
>>
>> configurations:
>>
>> state.backend.local-recovery: true
>> taskmanager.state.local.root-dirs: /data/flink/
>>
>> RocksDb backend DB storage path:  /data/flink ( set programmatically )
>>
>>
>> -
>> Dhanesh Arole
>>
> --
- Dhanesh ( sent from my mobile device. Pardon me for any typos )


Re: Proper way to get DataStream

2021-04-09 Thread Maminspapin
Arvid Heise-4, Ok, this is clear for me now. Good answer. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Task manager local state data after crash / recovery

2021-04-09 Thread Till Rohrmann
Hi Dhanesh,

The way local state works in Flink currently is the following: The user
configures a `taskmanager.state.local.root-dirs` or the tmp directory is
used where Flink creates a "localState" directory. This is the base
directory for all local state. Within this directory a TaskManager creates
for every allocation a sub directory using the `AllocationID`. Inside this
directory, Flink then stores the local state artefacts.

When Flink frees an allocation, then the corresponding directory is
deleted. In case that the process is being killed via a SIGTERM signal,
Flink also registers a shut down hook which tries to delete all directories
for the known `AllocationIDs`. If the shut down hooks do not run (e.g.
killed via SIGKILL), then Flink leaves some residual state.

Now the problem is what happens if the TaskManager process is restarted on
the same machine. In this case, Flink will simply use the same local state
directory but it ignores existing allocation id sub directories. The reason
is that Flink does not know whether these allocation id sub directories are
not used by another Flink process running on the same machine. In order to
make this decision Flink would have to know that it is the owner of these
sub directories. This could work if each TaskManager process is started
with a unique ID and if this ID is reused across restart attempts. This is
currently not for every deployment the case.

Long story short, it is currently expected that Flink can leave some
residual state in case of a hard process stop. Cleaning this state up is at
the moment unfortunately the responsibility of the user.

Cheers,
Till

On Tue, Apr 6, 2021 at 4:55 PM dhanesh arole  wrote:

> Hey all,
>
> We are running a stateful stream processing job on k8s using per-job
> standalone deployment entrypoint. Flink version: 1.12.1
>
> *Problem*: We have observed that whenever a task manager is either
> gracefully shut down or killed ( due to OOM, k8s worker node drain out etc
> ) it doesn't clean up the rocksdb state directories from the local disk.
> But when the task manager restarts and it receives new task allocation from
> the resource manager it rebuilds its local state for those tasks from the
> previous completed checkpoint. Over the period of time after multiple
> restarts, the task manager's local disk ends up accumulating lots of such
> orphan rocksdb directories.
>
> *Questions*: This isn't causing any functional issues to us, but it adds
> up lots of repeated ops overhead of cleaning these disks periodically. As a
> workaround, we are thinking of cleaning the local rocksdb directories
> except for the *taskmanager.state.local.root-dirs *before starting the
> task manager java process. Since, during every task manager restart keyed
> state backends for allocated tasks are anyway restored we feel it is the
> safest option atm and will solve our problem of ever growing disk on task
> manager pods. Is it safe to do so or are there any other consequences of
> it? Is there any config or restart policy that takes care of cleaning up
> such stale rocksdb directories during the statebackend restore process?.
>
> A sort of similar clean up is required when local task recovery is
> enabled. Whenever the task manager is not shut down gracefully the old
> localState doesn't get cleaned up on the next restart. This also causes
> lots of disk space wastage. It's easier to delete rocksdb working
> directories from previou run, but not so straightforward for the localState
> as one has to figure out which one of them are actually stale allocation
> IDs and clean only those one. Or check the latest completed checkpoint and
> delete all localStates directories for older checkpoints and
> allocation-ids. Is there any other solution to this problem? Also would
> like to learn from other users how are you handling these operational tasks
> currently?
>
> configurations:
>
> state.backend.local-recovery: true
> taskmanager.state.local.root-dirs: /data/flink/
>
> RocksDb backend DB storage path:  /data/flink ( set programmatically )
>
>
> -
> Dhanesh Arole
>


Re: Flink 1.13 and CSV (batch) writing

2021-04-09 Thread Flavio Pompermaier
That's absolutely useful. IMHO also join should work without
windows/triggers and left/right outer joins should be easier in order to
really migrate legacy code.
Also reduceGroup would help but less urgent.
I hope that my feedback as Flink user could be useful.

Best,
Flavio

On Fri, Apr 9, 2021 at 12:38 PM Kurt Young  wrote:

> Converting from table to DataStream in batch mode is indeed a problem now.
> But I think this will
> be improved soon.
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier 
> wrote:
>
>> In my real CSV I have LONG columns that can contain null values. In that
>> case I get a parse exception (and I would like to avoid to read it as a
>> string).
>> The ',bye' is just the way you can test that in my example (add that line
>> to the input csv).
>> If I use  'csv.null-literal' = '' it seems to work but, is it a
>> workaround or it is the right solution?
>>
>> Another big problem I'm having with the new APIs is that if I use
>> TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>> then I can't convert a table to a datastream..I need to use
>> StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(streamEnv, envSettings);
>> but in that case I can't use inBatchMode..
>>
>> On Fri, Apr 9, 2021 at 11:44 AM Kurt Young  wrote:
>>
>>> `format.ignore-first-line` is unfortunately a regression compared to the
>>> old one.
>>> I've created a ticket [1] to track this but according to current design,
>>> it seems not easy to do.
>>>
>>> Regarding null values, I'm not sure if I understand the issue you had.
>>> What do you mean by
>>> using ',bye' to test null Long values?
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-22178
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier 
>>> wrote:
>>>
 And another thing: in my csv I added ',bye' (to test null Long values)
 but I get a parse error..if I add  'csv.null-literal' = '' it seems to
 work..is that the right way to solve this problem?

 On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Thanks Kurt, now it works. However I can't find a way to skip the CSV
> header..before there was  "format.ignore-first-line" but now I can't find
> another way to skip it.
> I could set csv.ignore-parse-errors to true but then I can't detect
> other parsing errors, otherwise I need to manually transofrm the header
> into a comment adding the # character at the start of the line..
> How can I solve that?
>
> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young  wrote:
>
>> My DDL is:
>>
>> CREATE TABLE csv (
>>id BIGINT,
>>name STRING
>> ) WITH (
>>'connector' = 'filesystem',
>>'path' = '.',
>>'format' = 'csv'
>> );
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:
>>
>>> Hi Flavio,
>>>
>>> We would recommend you to use new table source & sink interfaces,
>>> which have different
>>> property keys compared to the old ones, e.g. 'connector' v.s.
>>> 'connector.type'.
>>>
>>> You can follow the 1.12 doc [1] to define your csv table, everything
>>> should work just fine.
>>>
>>> *Flink SQL> set table.dml-sync=true;*
>>>
>>> *[INFO] Session property has been set.*
>>>
>>>
>>> *Flink SQL> select * from csv;*
>>>
>>> *+--+--+*
>>>
>>> *|   id | name |*
>>>
>>> *+--+--+*
>>>
>>> *|3 |c |*
>>>
>>> *+--+--+*
>>>
>>> *Received a total of 1 row*
>>>
>>>
>>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>>
>>> *[INFO] Submitting SQL update statement to the cluster...*
>>>
>>> *[INFO] Execute statement in sync mode. Please wait for the
>>> execution finish...*
>>>
>>> *[INFO] Complete execution of the SQL update statement.*
>>>
>>>
>>> *Flink SQL> select * from csv;*
>>>
>>> *+--+--+*
>>>
>>> *|   id | name |*
>>>
>>> *+--+--+*
>>>
>>> *|4 |d |*
>>>
>>> *+--+--+*
>>>
>>> *Received a total of 1 row*
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Hi Till,
 since I was using the same WITH-clause both for reading and 

Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-09 Thread Arvid Heise
Afaik the main issue is that the JDBC drivers are leaking as they usually
assume only one classloader. If you are aware of it, you can bundle it in
your jar. However, you are right - it doesn't help with OP, so it was
probably not a good idea.

On Fri, Apr 9, 2021 at 11:45 AM Maciek Próchniak  wrote:

> Hi Arvid,
>
> "You can still bundle it into your jar if you prefer it." - is it really
> the case with JDBC drivers? I think that if the driver is not on Flink main
> classpath (that is, in the lib folder) there is no way the class would be
> loaded by main classloader - regardless of parent/child classloader setting?
>
> Those settings will help if the driver is both on Flink classpath and in
> user jar - I noticed now the documentation is slightly misleading
> suggesting otherwise, isn't it?
>
>
> thanks,
>
> maciek
>
>
> On 09.04.2021 11:25, Arvid Heise wrote:
>
> Hi,
>
> What do you mean by light-weight way? Just to clarify: you copy the jar
> once in the lib folder and restart the cluster once (and put it into the
> lib/ for future clusters). Not sure how it would be more light-weight.
>
> You can still bundle it into your jar if you prefer it. It just tends to
> be big but if it's easier for you to not touch the cluster, then just put
> everything into your jar.
>
> On Fri, Apr 9, 2021 at 4:08 AM 太平洋 <495635...@qq.com> wrote:
>
>> I have tried  to add 'classloader.parent-first-patterns.additional:
>> "ru.yandex.clickhouse" ' to flink-config, but problem still exist.
>> Is there lightweight way to put clickhouse JDBC driver on Flink lib/
>> folder?
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Maciek Próchniak" ;
>> *发送时间:* 2021年4月9日(星期五) 凌晨3:24
>> *收件人:* "太平洋"<495635...@qq.com>;"Arvid Heise";"Yangze
>> Guo";
>> *抄送:* "user";"guowei.mgw"> >;"renqschn";
>> *主题:* Re: 回复: period batch job lead to OutOfMemoryError: Metaspace
>> problem
>>
>> Hi,
>>
>> Did you put the clickhouse JDBC driver on Flink main classpath (in lib
>> folder) and not in user-jar - as described here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code
>> ?
>>
>> When we encountered Metaspace leaks recently, in quite a few cases it
>> turned out that the problem was the JDBC driver in user classloder which
>> was registered by DriverManager and caused classloader leak.
>>
>>
>> maciek
>>
>>
>> On 08.04.2021 11:42, 太平洋 wrote:
>>
>> My application program looks like this. Does this structure has some
>> problem?
>>
>> public class StreamingJob {
>> public static void main(String[] args) throws Exception {
>> int i = 0;
>> while (i < 100) {
>> try {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>> env.setParallelism(Parallelism);
>>
>> EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner()
>> .inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env,
>> bsSettings);
>>
>> bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
>> Table t = bsTableEnv.sqlQuery(query);
>>
>> DataStream points = bsTableEnv.toAppendStream(t,
>> DataPoint.class);
>>
>> DataStream weightPoints = points.map();
>>
>> DataStream predictPoints = weightPoints.keyBy()
>> .reduce().map();
>>
>> // side output
>> final OutputTag outPutPredict = new
>> OutputTag("predict") {
>> };
>>
>> SingleOutputStreamOperator mainDataStream = predictPoints
>> .process();
>>
>> DataStream exStream =
>> mainDataStream.getSideOutput(outPutPredict);
>>
>> //write data to clickhouse
>> String insertIntoCKSql = "xxx";
>> mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
>> new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
>> new
>> JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
>> .withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));
>>
>> // write data to kafka
>> FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
>> exStream.map().addSink(producer);
>>
>> env.execute("Prediction Program");
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>> i++;
>> Thread.sleep(window * 1000);
>> }
>> }
>> }
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Arvid Heise"  ;
>> *发送时间:* 2021年4月8日(星期四) 下午2:33
>> *收件人:* "Yangze Guo" ;
>> *抄送:* "太平洋"<495635...@qq.com> <495635...@qq.com>;"user"
>>  ;"guowei.mgw"
>>  ;"renqschn"
>>  ;
>> *主题:* Re: period batch job lead to OutOfMemoryError: Metaspace problem
>>
>> Hi,
>>
>> ChildFirstClassLoader are created (more or less) by application jar and
>> seeing so many looks like a classloader leak to me. I'd expect you to see a
>> new ChildFirstClassLoader popping up with each new job submission.
>>
>> Can you check who is referencing the ChildFirstClassLoader transitively?
>> Usually, it's some thread that is 

Query regarding flink metric types

2021-04-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi Community,

Need some information regarding metrics type mentioned in flink documentation.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html

For the checkpoint metrics, below metrics are defined as of type gauge. As per 
my understanding gauge type is used to represent a value which can 
increase/decrease whereas counter is used to represent a value which will keep 
increasing. Below metrics will be keep increasing during the job run. Hence 
counter can be appropriate metric type for these. Please share your input on 
this.

numberOfCompletedCheckpoints
The number of successfully completed checkpoints.
Gauge
numberOfFailedCheckpoints
The number of failed checkpoints.
Gauge
totalNumberOfCheckpoints
The number of total checkpoints (in progress, completed, failed).
Gauge

Also "isBackPressured"  metric by the name it indicates as it returns boolean 
value Yes/No. Flink documentation says backpressure is measured as below,

  *   OK: 0 <= Ratio <= 0.10
  *   LOW: 0.10 < Ratio <= 0.5
  *   HIGH: 0.5 < Ratio <= 1
What exactly this metric reports ?

isBackPressured
Whether the task is back-pressured.
Gauge

Thanks,
Suchithra


Re: Flink 1.13 and CSV (batch) writing

2021-04-09 Thread Kurt Young
Converting from table to DataStream in batch mode is indeed a problem now.
But I think this will
be improved soon.

Best,
Kurt


On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier 
wrote:

> In my real CSV I have LONG columns that can contain null values. In that
> case I get a parse exception (and I would like to avoid to read it as a
> string).
> The ',bye' is just the way you can test that in my example (add that line
> to the input csv).
> If I use  'csv.null-literal' = '' it seems to work but, is it a workaround
> or it is the right solution?
>
> Another big problem I'm having with the new APIs is that if I use
> TableEnvironment tableEnv = TableEnvironment.create(envSettings);
> then I can't convert a table to a datastream..I need to use
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, envSettings);
> but in that case I can't use inBatchMode..
>
> On Fri, Apr 9, 2021 at 11:44 AM Kurt Young  wrote:
>
>> `format.ignore-first-line` is unfortunately a regression compared to the
>> old one.
>> I've created a ticket [1] to track this but according to current design,
>> it seems not easy to do.
>>
>> Regarding null values, I'm not sure if I understand the issue you had.
>> What do you mean by
>> using ',bye' to test null Long values?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-22178
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier 
>> wrote:
>>
>>> And another thing: in my csv I added ',bye' (to test null Long values)
>>> but I get a parse error..if I add  'csv.null-literal' = '' it seems to
>>> work..is that the right way to solve this problem?
>>>
>>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier 
>>> wrote:
>>>
 Thanks Kurt, now it works. However I can't find a way to skip the CSV
 header..before there was  "format.ignore-first-line" but now I can't find
 another way to skip it.
 I could set csv.ignore-parse-errors to true but then I can't detect
 other parsing errors, otherwise I need to manually transofrm the header
 into a comment adding the # character at the start of the line..
 How can I solve that?

 On Fri, Apr 9, 2021 at 4:07 AM Kurt Young  wrote:

> My DDL is:
>
> CREATE TABLE csv (
>id BIGINT,
>name STRING
> ) WITH (
>'connector' = 'filesystem',
>'path' = '.',
>'format' = 'csv'
> );
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:
>
>> Hi Flavio,
>>
>> We would recommend you to use new table source & sink interfaces,
>> which have different
>> property keys compared to the old ones, e.g. 'connector' v.s.
>> 'connector.type'.
>>
>> You can follow the 1.12 doc [1] to define your csv table, everything
>> should work just fine.
>>
>> *Flink SQL> set table.dml-sync=true;*
>>
>> *[INFO] Session property has been set.*
>>
>>
>> *Flink SQL> select * from csv;*
>>
>> *+--+--+*
>>
>> *|   id | name |*
>>
>> *+--+--+*
>>
>> *|3 |c |*
>>
>> *+--+--+*
>>
>> *Received a total of 1 row*
>>
>>
>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>
>> *[INFO] Submitting SQL update statement to the cluster...*
>>
>> *[INFO] Execute statement in sync mode. Please wait for the execution
>> finish...*
>>
>> *[INFO] Complete execution of the SQL update statement.*
>>
>>
>> *Flink SQL> select * from csv;*
>>
>> *+--+--+*
>>
>> *|   id | name |*
>>
>> *+--+--+*
>>
>> *|4 |d |*
>>
>> *+--+--+*
>>
>> *Received a total of 1 row*
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> Hi Till,
>>> since I was using the same WITH-clause both for reading and writing
>>> I discovered that overwrite is actually supported in the Sinks, while in
>>> the Sources an exception is thrown (I was thinking that those properties
>>> were simply ignored).
>>> However the quote-character is not supported in the sinks: is this a
>>> bug or is it the intended behaviour?.
>>> Here is a minimal example that reproduce the problem (put in the
>>> /tmp/test.csv something like '1,hello' or '2,hi').
>>>
>>> import 

Re: Flink: Exception from container-launch exitCode=2

2021-04-09 Thread Till Rohrmann
I actually think that the logging problem is caused by Hadoop 2.7.3 which
pulls in the slf4j-log4j12-1.7.10.jar. This binding is then used but there
is no proper configuration file for log4j because Flink actually uses
log4j2.

Cheers,
Till

On Fri, Apr 9, 2021 at 12:05 PM Till Rohrmann  wrote:

> Hi Yik San,
>
> to me it looks as if there is a problem with the job and the deployment.
> Unfortunately, the logging seems to not have worked. Could you check that
> you have a valid log4j.properties file in your conf directory.
>
> Cheers,
> Till
>
> On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan 
> wrote:
>
>> *The question is cross-posted on Stack
>> Overflow 
>> https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2
>> .
>> Viewing the question on Stack Overflow is preferred as I include a few
>> images for better description.*
>>
>> Hi community,
>>
>> ## Flink (Scala) exitCode=2
>>
>> I have a simple Flink job that reads from 2 columns of a Hive table
>> `mysource`, add up the columns, then writes the result to another Hive
>> table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`,
>> and `mysink` has only 1 column `c bigint`.
>>
>> The job submits successfully, however, I observe it keeps retrying.
>>
>> [![enter image description here][1]][1]
>>
>> I click into each attempt, they simply show this.
>>
>> ```
>> AM Container for appattempt_1607399514900_2511_001267 exited with
>> exitCode: 2
>> For more detailed output, check application tracking page:
>> http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then,
>> click on links to logs of each attempt.
>> Diagnostics: Exception from container-launch.
>> Container id: container_e13_1607399514900_2511_1267_01
>> Exit code: 2
>> Stack trace: ExitCodeException exitCode=2:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
>> at org.apache.hadoop.util.Shell.run(Shell.java:479)
>> at
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Container exited with a non-zero exit code 2
>> Failing this attempt
>> ```
>>
>> However, the "Logs" has no useful info - it complains about the logging
>> lib, but I believe they are really warnings, not errors.
>>
>> ```
>> LogType:jobmanager.err
>> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
>> LogLength:1010
>> Log Contents:
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> log4j:WARN No appenders could be found for logger
>> (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
>> log4j:WARN Please initialize the log4j system properly.
>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
>> more info.
>> End of LogType:jobmanager.err
>>
>> LogType:jobmanager.out
>> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
>> LogLength:0
>> Log Contents:
>> End of LogType:jobmanager.out
>> ```
>>
>> This is the job written in Scala.
>>
>> ```scala
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
>> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>> import org.apache.flink.table.catalog.hive.HiveCatalog
>>
>> object HiveToyExample {
>>   def main(args: Array[String]): Unit = {
>> val settings = EnvironmentSettings.newInstance.build
>> val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> val tableEnv = StreamTableEnvironment.create(execEnv, settings)
>>
>> val 

Re: Flink 1.13 and CSV (batch) writing

2021-04-09 Thread Flavio Pompermaier
In my real CSV I have LONG columns that can contain null values. In that
case I get a parse exception (and I would like to avoid to read it as a
string).
The ',bye' is just the way you can test that in my example (add that line
to the input csv).
If I use  'csv.null-literal' = '' it seems to work but, is it a workaround
or it is the right solution?

Another big problem I'm having with the new APIs is that if I use
TableEnvironment tableEnv = TableEnvironment.create(envSettings);
then I can't convert a table to a datastream..I need to use
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(streamEnv, envSettings);
but in that case I can't use inBatchMode..

On Fri, Apr 9, 2021 at 11:44 AM Kurt Young  wrote:

> `format.ignore-first-line` is unfortunately a regression compared to the
> old one.
> I've created a ticket [1] to track this but according to current design,
> it seems not easy to do.
>
> Regarding null values, I'm not sure if I understand the issue you had.
> What do you mean by
> using ',bye' to test null Long values?
>
> [1] https://issues.apache.org/jira/browse/FLINK-22178
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier 
> wrote:
>
>> And another thing: in my csv I added ',bye' (to test null Long values)
>> but I get a parse error..if I add  'csv.null-literal' = '' it seems to
>> work..is that the right way to solve this problem?
>>
>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier 
>> wrote:
>>
>>> Thanks Kurt, now it works. However I can't find a way to skip the CSV
>>> header..before there was  "format.ignore-first-line" but now I can't find
>>> another way to skip it.
>>> I could set csv.ignore-parse-errors to true but then I can't detect
>>> other parsing errors, otherwise I need to manually transofrm the header
>>> into a comment adding the # character at the start of the line..
>>> How can I solve that?
>>>
>>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young  wrote:
>>>
 My DDL is:

 CREATE TABLE csv (
id BIGINT,
name STRING
 ) WITH (
'connector' = 'filesystem',
'path' = '.',
'format' = 'csv'
 );

 Best,
 Kurt


 On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:

> Hi Flavio,
>
> We would recommend you to use new table source & sink interfaces,
> which have different
> property keys compared to the old ones, e.g. 'connector' v.s.
> 'connector.type'.
>
> You can follow the 1.12 doc [1] to define your csv table, everything
> should work just fine.
>
> *Flink SQL> set table.dml-sync=true;*
>
> *[INFO] Session property has been set.*
>
>
> *Flink SQL> select * from csv;*
>
> *+--+--+*
>
> *|   id | name |*
>
> *+--+--+*
>
> *|3 |c |*
>
> *+--+--+*
>
> *Received a total of 1 row*
>
>
> *Flink SQL> insert overwrite csv values(4, 'd');*
>
> *[INFO] Submitting SQL update statement to the cluster...*
>
> *[INFO] Execute statement in sync mode. Please wait for the execution
> finish...*
>
> *[INFO] Complete execution of the SQL update statement.*
>
>
> *Flink SQL> select * from csv;*
>
> *+--+--+*
>
> *|   id | name |*
>
> *+--+--+*
>
> *|4 |d |*
>
> *+--+--+*
>
> *Received a total of 1 row*
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> Hi Till,
>> since I was using the same WITH-clause both for reading and writing I
>> discovered that overwrite is actually supported in the Sinks, while in 
>> the
>> Sources an exception is thrown (I was thinking that those properties were
>> simply ignored).
>> However the quote-character is not supported in the sinks: is this a
>> bug or is it the intended behaviour?.
>> Here is a minimal example that reproduce the problem (put in the
>> /tmp/test.csv something like '1,hello' or '2,hi').
>>
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableEnvironment;
>>
>> public class FlinkCsvTest {
>>   public static void main(String[] args) throws Exception {
>> final EnvironmentSettings envSettings =
>>
>> 

Re: Flink: Exception from container-launch exitCode=2

2021-04-09 Thread Till Rohrmann
Hi Yik San,

to me it looks as if there is a problem with the job and the deployment.
Unfortunately, the logging seems to not have worked. Could you check that
you have a valid log4j.properties file in your conf directory.

Cheers,
Till

On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan 
wrote:

> *The question is cross-posted on Stack
> Overflow 
> https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2
> .
> Viewing the question on Stack Overflow is preferred as I include a few
> images for better description.*
>
> Hi community,
>
> ## Flink (Scala) exitCode=2
>
> I have a simple Flink job that reads from 2 columns of a Hive table
> `mysource`, add up the columns, then writes the result to another Hive
> table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`,
> and `mysink` has only 1 column `c bigint`.
>
> The job submits successfully, however, I observe it keeps retrying.
>
> [![enter image description here][1]][1]
>
> I click into each attempt, they simply show this.
>
> ```
> AM Container for appattempt_1607399514900_2511_001267 exited with
> exitCode: 2
> For more detailed output, check application tracking page:
> http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then,
> click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_e13_1607399514900_2511_1267_01
> Exit code: 2
> Stack trace: ExitCodeException exitCode=2:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
> at org.apache.hadoop.util.Shell.run(Shell.java:479)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Container exited with a non-zero exit code 2
> Failing this attempt
> ```
>
> However, the "Logs" has no useful info - it complains about the logging
> lib, but I believe they are really warnings, not errors.
>
> ```
> LogType:jobmanager.err
> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
> LogLength:1010
> Log Contents:
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
> SLF4J: Found binding in
> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> log4j:WARN No appenders could be found for logger
> (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> End of LogType:jobmanager.err
>
> LogType:jobmanager.out
> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
> LogLength:0
> Log Contents:
> End of LogType:jobmanager.out
> ```
>
> This is the job written in Scala.
>
> ```scala
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.catalog.hive.HiveCatalog
>
> object HiveToyExample {
>   def main(args: Array[String]): Unit = {
> val settings = EnvironmentSettings.newInstance.build
> val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(execEnv, settings)
>
> val hiveCatalog = new HiveCatalog(
>   "myhive",
>   "aiinfra",
>   "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
> )
> tableEnv.registerCatalog("myhive", hiveCatalog)
> tableEnv.useCatalog("myhive")
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>
> tableEnv
>   .executeSql("""
>   |INSERT INTO mysink
>   |SELECT a + b
>   |FROM mysource
>   

Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-09 Thread Maciek Próchniak

Hi Arvid,

"You can still bundle it into your jar if you prefer it." - is it really 
the case with JDBC drivers? I think that if the driver is not on Flink 
main classpath (that is, in the lib folder) there is no way the class 
would be loaded by main classloader - regardless of parent/child 
classloader setting?


Those settings will help if the driver is both on Flink classpath and in 
user jar - I noticed now the documentation is slightly misleading 
suggesting otherwise, isn't it?



thanks,

maciek


On 09.04.2021 11:25, Arvid Heise wrote:

Hi,

What do you mean by light-weight way? Just to clarify: you copy the 
jar once in the lib folder and restart the cluster once (and put it 
into the lib/ for future clusters). Not sure how it would be more 
light-weight.


You can still bundle it into your jar if you prefer it. It just tends 
to be big but if it's easier for you to not touch the cluster, then 
just put everything into your jar.


On Fri, Apr 9, 2021 at 4:08 AM 太平洋 <495635...@qq.com 
> wrote:


I have tried  to add
'classloader.parent-first-patterns.additional:
"ru.yandex.clickhouse" ' to flink-config, but problem still exist.
Is there lightweight way to put clickhouse JDBC driver on Flink
lib/ folder?

-- 原始邮件 --
*发件人:* "Maciek Próchniak" mailto:m...@touk.pl>>;
*发送时间:* 2021年4月9日(星期五) 凌晨3:24
*收件人:* "太平洋"<495635...@qq.com >;"Arvid
Heise"mailto:ar...@apache.org>>;"Yangze
Guo"mailto:karma...@gmail.com>>;
*抄送:* "user"mailto:user@flink.apache.org>>;"guowei.mgw"mailto:guowei@gmail.com>>;"renqschn"mailto:renqs...@gmail.com>>;
*主题:* Re: 回复: period batch job lead to OutOfMemoryError:
Metaspace problem

Hi,

Did you put the clickhouse JDBC driver on Flink main classpath (in
lib folder) and not in user-jar - as described here:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code

?

When we encountered Metaspace leaks recently, in quite a few cases
it turned out that the problem was the JDBC driver in user
classloder which was registered by DriverManager and caused
classloader leak.


maciek


On 08.04.2021 11:42, 太平洋 wrote:

My application program looks like this. Does this structure has
some problem?

public class StreamingJob {
public static void main(String[] args) throws Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);

EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner()
.inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(env, bsSettings);

bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
Table t = bsTableEnv.sqlQuery(query);

DataStream points = bsTableEnv.toAppendStream(t,
DataPoint.class);

DataStream weightPoints = points.map();

DataStream predictPoints = weightPoints.keyBy()
.reduce().map();

// side output
final OutputTag outPutPredict = new
OutputTag("predict") {
};

SingleOutputStreamOperator mainDataStream =
predictPoints
.process();

DataStream exStream =
mainDataStream.getSideOutput(outPutPredict);

                                        //write data to clickhouse
String insertIntoCKSql = "xxx";
mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new
CkSinkBuilder(),
new
JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
new

JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
.withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));

// write data to kafka
FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
exStream.map().addSink(producer);

env.execute("Prediction Program");
} catch (Exception e) {
e.printStackTrace();
}
i++;
Thread.sleep(window * 1000);
}
}
}



-- 原始邮件 --
*发件人:* "Arvid Heise"  ;
*发送时间:* 2021年4月8日(星期四) 下午2:33
*收件人:* "Yangze Guo"
;
*抄送:* "太平洋"<495635...@qq.com>
;"user"
;"guowei.mgw"
;"renqschn"
;
*主题:* Re: period batch job lead to OutOfMemoryError: Metaspace
problem

Hi,

ChildFirstClassLoader are created (more or less) by application
jar and seeing so many looks 

Re: Flink 1.13 and CSV (batch) writing

2021-04-09 Thread Kurt Young
`format.ignore-first-line` is unfortunately a regression compared to the
old one.
I've created a ticket [1] to track this but according to current design, it
seems not easy to do.

Regarding null values, I'm not sure if I understand the issue you had. What
do you mean by
using ',bye' to test null Long values?

[1] https://issues.apache.org/jira/browse/FLINK-22178

Best,
Kurt


On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier 
wrote:

> And another thing: in my csv I added ',bye' (to test null Long values) but
> I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is
> that the right way to solve this problem?
>
> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier 
> wrote:
>
>> Thanks Kurt, now it works. However I can't find a way to skip the CSV
>> header..before there was  "format.ignore-first-line" but now I can't find
>> another way to skip it.
>> I could set csv.ignore-parse-errors to true but then I can't detect other
>> parsing errors, otherwise I need to manually transofrm the header into a
>> comment adding the # character at the start of the line..
>> How can I solve that?
>>
>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young  wrote:
>>
>>> My DDL is:
>>>
>>> CREATE TABLE csv (
>>>id BIGINT,
>>>name STRING
>>> ) WITH (
>>>'connector' = 'filesystem',
>>>'path' = '.',
>>>'format' = 'csv'
>>> );
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:
>>>
 Hi Flavio,

 We would recommend you to use new table source & sink interfaces, which
 have different
 property keys compared to the old ones, e.g. 'connector' v.s.
 'connector.type'.

 You can follow the 1.12 doc [1] to define your csv table, everything
 should work just fine.

 *Flink SQL> set table.dml-sync=true;*

 *[INFO] Session property has been set.*


 *Flink SQL> select * from csv;*

 *+--+--+*

 *|   id | name |*

 *+--+--+*

 *|3 |c |*

 *+--+--+*

 *Received a total of 1 row*


 *Flink SQL> insert overwrite csv values(4, 'd');*

 *[INFO] Submitting SQL update statement to the cluster...*

 *[INFO] Execute statement in sync mode. Please wait for the execution
 finish...*

 *[INFO] Complete execution of the SQL update statement.*


 *Flink SQL> select * from csv;*

 *+--+--+*

 *|   id | name |*

 *+--+--+*

 *|4 |d |*

 *+--+--+*

 *Received a total of 1 row*

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html

 Best,
 Kurt


 On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier 
 wrote:

> Hi Till,
> since I was using the same WITH-clause both for reading and writing I
> discovered that overwrite is actually supported in the Sinks, while in the
> Sources an exception is thrown (I was thinking that those properties were
> simply ignored).
> However the quote-character is not supported in the sinks: is this a
> bug or is it the intended behaviour?.
> Here is a minimal example that reproduce the problem (put in the
> /tmp/test.csv something like '1,hello' or '2,hi').
>
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
>
> public class FlinkCsvTest {
>   public static void main(String[] args) throws Exception {
> final EnvironmentSettings envSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final TableEnvironment tableEnv =
> TableEnvironment.create(envSettings);
> // ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> // BatchTableEnvironment tableEnv =
> BatchTableEnvironment.create(env);
> final String tableInName = "testTableIn";
> final String createInTableDdl = getSourceDdl(tableInName,
> "/tmp/test.csv"); //
>
> final String tableOutName = "testTableOut";
> final String createOutTableDdl = getSinkDdl(tableOutName,
> "/tmp/test-out.csv"); //
> tableEnv.executeSql(createInTableDdl);
> tableEnv.executeSql(createOutTableDdl);
>
> Table tableIn = tableEnv.from(tableInName);
> Table tableOut = tableEnv.from(tableOutName);
> tableIn.insertInto(tableOutName);
> // tableEnv.toDataSet(table, 

Re: 求问Hive DDL TBLPROPERTIES不生效

2021-04-09 Thread Rui Li
1. watermark的问题需要检查一下source,比如watermark是如何定义的、是不是source没数据导致watermark不前进等。
2. 小文件合并的功能Hive跟FileSystem connector都是支持的,可以参考这个文档配置一下试试:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-compaction

On Fri, Apr 9, 2021 at 1:04 PM HunterXHunter <1356469...@qq.com> wrote:

> 你好,
> 1:我设置的时候就是 使用的 partition-time 同时
> 设定checkpoint间隔为60s。但是我发现watermark一直没有生成或者更新,导致我的数据一直无法commit。想知道
> 为什么watermark无法生成。当时使用process-time是没问题的。
> 2:因为写hive的话会有小文件的问题。所以我使用file sink来设置合并文件和控制文件大小。但是写文件是无法写hive
> metastore。所以hive查不出数据。
>
> 想知道有什么方法解决hive小文件问题,难道只能T+1做小文件合并吗。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


Re: how to submit jobs remotely when a rest proxy like nginx is used and REST endpoint is bind to loopback interface?

2021-04-09 Thread Arvid Heise
Hi Ming,

instead of using the command line interface to run Flink applications, you
should use the REST API [1].
You should first upload your jar in one call and then execute the job in
the second call.

The rest endpoint would be http://10.20.39.43:8080/


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jars-jarid-run

On Fri, Apr 9, 2021 at 4:45 AM Ming Li  wrote:

> Hi,
>
> The flink official document clearly states that "Simple mutual
> authentication may be enabled by configuration if authentication of
> connections to the REST endpoint is required, but we recommend to deploy a
> “side car proxy”: Bind the REST endpoint to the loopback interface (or the
> pod-local interface in Kubernetes) and start a REST proxy that
> authenticates and forwards the requests to Flink. Examples for proxies that
> Flink users have deployed are Envoy Proxy  or 
> NGINX
> with MOD_AUTH
> ."
>
>
>
> So I am wondering, in standalone mode when HA is not enabeld, when a rest
> proxy like nginx is used, and rest endpoint is bind to the loopback
> interface, how should we submit jobs remotely?
>
>
>
> ps.
>
> 1. sample flink-conf.yaml settings, and nginx settings are as below
> showing:
>
> rest.bind-port: 9091/rest.bind-address: 127.0.0.1 (this is the port and ip
> where the rest endpoint bind itself to in the host where it is started)
>
> rest.port: 9091/rest.address: 127.0.0.1 (this is the port and ip used by
> rest clients when submit requests, so basically it should reach the above
> rest.bind-port/rest.bind-address)
>
> [image: image.png]
>
> 2. I know that we can use curl to request the nginx proxy, which
> authenticates and forwards the request to flink, as below showing: curl
> -v -u user1:user1 http://10.20.39.43:8080/config (which is the address
> where nginx is listening to)
>
> 3. I know that  we can submit jobs from the host where job manager is
> located, as below showing:
>
> /opt/flink-1.12.2/bin/flink run -m 127.0.0.1:9091
> /opt/flink-1.12.2/examples/batch/WordCount.jar --input /tmp/README.txt
> --output /tmp/flink.test.txt11  ()
>
> Thanks!
> --
> Best Regards
> Michael Li
>


Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-09 Thread Arvid Heise
Hi,

What do you mean by light-weight way? Just to clarify: you copy the jar
once in the lib folder and restart the cluster once (and put it into the
lib/ for future clusters). Not sure how it would be more light-weight.

You can still bundle it into your jar if you prefer it. It just tends to be
big but if it's easier for you to not touch the cluster, then just put
everything into your jar.

On Fri, Apr 9, 2021 at 4:08 AM 太平洋 <495635...@qq.com> wrote:

> I have tried  to add 'classloader.parent-first-patterns.additional:
> "ru.yandex.clickhouse" ' to flink-config, but problem still exist.
> Is there lightweight way to put clickhouse JDBC driver on Flink lib/
> folder?
>
>
> -- 原始邮件 --
> *发件人:* "Maciek Próchniak" ;
> *发送时间:* 2021年4月9日(星期五) 凌晨3:24
> *收件人:* "太平洋"<495635...@qq.com>;"Arvid Heise";"Yangze
> Guo";
> *抄送:* "user";"guowei.mgw" >;"renqschn";
> *主题:* Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem
>
> Hi,
>
> Did you put the clickhouse JDBC driver on Flink main classpath (in lib
> folder) and not in user-jar - as described here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code
> ?
>
> When we encountered Metaspace leaks recently, in quite a few cases it
> turned out that the problem was the JDBC driver in user classloder which
> was registered by DriverManager and caused classloader leak.
>
>
> maciek
>
>
> On 08.04.2021 11:42, 太平洋 wrote:
>
> My application program looks like this. Does this structure has some
> problem?
>
> public class StreamingJob {
> public static void main(String[] args) throws Exception {
> int i = 0;
> while (i < 100) {
> try {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.setParallelism(Parallelism);
>
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner()
> .inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env,
> bsSettings);
>
> bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
> Table t = bsTableEnv.sqlQuery(query);
>
> DataStream points = bsTableEnv.toAppendStream(t,
> DataPoint.class);
>
> DataStream weightPoints = points.map();
>
> DataStream predictPoints = weightPoints.keyBy()
> .reduce().map();
>
> // side output
> final OutputTag outPutPredict = new
> OutputTag("predict") {
> };
>
> SingleOutputStreamOperator mainDataStream = predictPoints
> .process();
>
> DataStream exStream =
> mainDataStream.getSideOutput(outPutPredict);
>
> //write data to clickhouse
> String insertIntoCKSql = "xxx";
> mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
> new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
> new
> JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
> .withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));
>
> // write data to kafka
> FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
> exStream.map().addSink(producer);
>
> env.execute("Prediction Program");
> } catch (Exception e) {
> e.printStackTrace();
> }
> i++;
> Thread.sleep(window * 1000);
> }
> }
> }
>
>
>
> -- 原始邮件 --
> *发件人:* "Arvid Heise"  ;
> *发送时间:* 2021年4月8日(星期四) 下午2:33
> *收件人:* "Yangze Guo" ;
> *抄送:* "太平洋"<495635...@qq.com> <495635...@qq.com>;"user"
>  ;"guowei.mgw"
>  ;"renqschn"
>  ;
> *主题:* Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> Hi,
>
> ChildFirstClassLoader are created (more or less) by application jar and
> seeing so many looks like a classloader leak to me. I'd expect you to see a
> new ChildFirstClassLoader popping up with each new job submission.
>
> Can you check who is referencing the ChildFirstClassLoader transitively?
> Usually, it's some thread that is lingering around because some third party
> library is leaking threads etc.
>
> OneInputStreamTask is legit and just indicates that you have a job running
> with 4 slots on that TM. It should not hold any dedicated metaspace memory.
>
> On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo  wrote:
>
>> I went through the JM & TM logs but could not find any valuable clue.
>> The exception is actually thrown by kafka-producer-network-thread.
>> Maybe @Qingsheng could also take a look?
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Apr 8, 2021 at 10:39 AM 太平洋 <495635...@qq.com> wrote:
>> >
>> > I have configured to 512M, but problem still exist. Now the memory size
>> is still 256M.
>> > Attachments are TM and JM logs.
>> >
>> > Look forward to your reply.
>> >
>> > -- 原始邮件 --
>> > 发件人: "Yangze Guo" ;
>> > 发送时间: 2021年4月6日(星期二) 晚上6:35
>> > 收件人: "太平洋"<495635...@qq.com>;
>> > 抄送: "user";"guowei.mgw";
>> > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>> >
>> > > I 

Re: Dynamic configuration via broadcast state

2021-04-09 Thread Arvid Heise
Hi Vishal,

what you are trying to achieve is quite common and has its own
documentation [1]. Currently, there is no way to hold back elements of the
non-broadcast side (your question 2 in OP), so you have to save them until
configuration arrives.

If you have several configurable operators, you could try to create a
generic configuration holder and chain the actual operator to it [2] or you
create a base class that does all the work and you just override how the
configuration is applied to all elements.

For sources, you have to implement your own source, for sinks you can use
the same chaining trick.

I currently don't see how you can use watermarks can help. We are still in
process of providing a way to synchronize sources with different timestamps
automatically and it will arrive not before Flink 1.14.

---

If configuration changes are quite rare, there is an easier option for you
that is viable if your state is not huge: you could simply load
configuration statically in `open` and fail on configuration change to
trigger a recovery. That keeps the whole DataStream simple at the cost of
additional recoveries.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Broadcast-tt42807.html

On Wed, Apr 7, 2021 at 12:37 AM vishalovercome  wrote:

> I researched a bit more and another suggested solution is to build a custom
> source function that somehow waits for each operator to load it's
> configuration which is infact set in the open method of the source itself.
> I'm not sure if that's a good idea as that just exposes entire job
> configuration to an operator.
>
> Can we leverage watermarks/idle sources somehow? Basically set the
> timestamp
> of configuration stream to a very low number at the start and then force it
> to be read before data from other sources start flowing in. As
> configurations aren't going to change frequently we can idle these sources.
>
> 1. Is the above approach even possible?
> 2. Can an idle source resume once configuration changes?
>
> A rough sketch of timestamp assignment, re-activating an idle source would
> help!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink does not cleanup some disk memory after submitting jar over rest

2021-04-09 Thread Till Rohrmann
Hi,

What you could also do is to create several heap dumps [1] whenever you
submit a new job. This could allow us to analyze whether there is something
increasing the heap memory consumption. Additionally, you could try to
upgrade your cluster to Flink 1.12.2 since we fixed some problems Maciek
mentioned.

[1] https://stackoverflow.com/a/3042463/4815083

Cheers,
Till

On Thu, Apr 8, 2021 at 9:15 PM Maciek Próchniak  wrote:

> Hi,
>
> don't know if this is the problem you're facing, but some time ago we
> encountered two issues connected to REST API and increased disk usage after
> each submission:
>
> https://issues.apache.org/jira/browse/FLINK-21164
>
> https://issues.apache.org/jira/browse/FLINK-9844
>
> - they're closed ATM, but only 1.12.2 contains the fixes.
>
>
> maciek
>
>
> On 08.04.2021 19:52, Great Info wrote:
>
> I have deployed my own flink setup in AWS ECS. One Service for JobManager
> and one Service for task Managers. I am running one ECS task for a job
> manager and 3 ecs tasks for TASK managers.
>
> I have a kind of batch job which I upload using flink rest every-day with
> changing new arguments, when I submit each time disk memory gets increased
> by ~ 600MB, I have given a checkpoint as S3 . Also I have set
> *historyserver.archive.clean-expired-jobs* true .
>
> Since I am running on ECS, I am not able to find why the memory is getting
> increased on every jar upload and execution .
>
> What are the flink config params I should look at to make sure the memory
> is not shooting up?
>
>


Re: Why does flink-quickstart-scala suggests adding connector dependencies in the default scope, while Flink Hive integration docs suggest the opposite

2021-04-09 Thread Yik San Chan
Thank you Till!

On Fri, Apr 9, 2021 at 4:25 PM Till Rohrmann  wrote:

> Hi Yik San,
>
> (1) You could do the same with Kafka. For Hive I believe that the
> dependency is simply quite large so that it hurts more if you bundle it
> with your user code.
>
> (2) If you change the content in the lib directory, then you have to
> restart the cluster.
>
> Cheers,
> Till
>
> On Fri, Apr 9, 2021 at 4:02 AM Yik San Chan 
> wrote:
>
>> Hi Till, I have 2 follow-ups.
>>
>> (1) Why is Hive special, while for connectors such as kafka, the docs
>> suggest simply bundling the kafka connector dependency with my user code?
>>
>> (2) it seems the document misses the "before you start the cluster" part
>> - does it always require a cluster restart whenever the /lib directory
>> changes?
>>
>> Thanks.
>>
>> Best,
>> Yik San
>>
>> On Fri, Apr 9, 2021 at 1:07 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Yik San,
>>>
>>> for future reference, I copy my answer from the SO here:
>>>
>>> The reason for this difference is that for Hive it is recommended to
>>> start the cluster with the respective Hive dependencies. The documentation
>>> [1] states that it's best to put the dependencies into the lib directory
>>> before you start the cluster. That way the cluster is enabled to run jobs
>>> which use Hive. At the same time, you don't have to bundle this dependency
>>> in the user jar which reduces its size. However, there shouldn't be
>>> anything preventing you from bundling the Hive dependency with your user
>>> code if you want to.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#dependencies
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 8, 2021 at 11:41 AM Yik San Chan 
>>> wrote:
>>>
 The question is cross-posted on Stack Overflow
 https://stackoverflow.com/questions/67001326/why-does-flink-quickstart-scala-suggests-adding-connector-dependencies-in-the-de
 .

 ## Connector dependencies should be in default scope

 This is what [flink-quickstart-scala](
 https://github.com/apache/flink/blob/d12eeedfac6541c3a0711d1580ce3bd68120ca90/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml#L84)
 suggests:

 ```
 

 
 ```

 It also aligns with [Flink project configuration](
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies
 ):

 > We recommend packaging the application code and all its required
 dependencies into one jar-with-dependencies which we refer to as the
 application jar. The application jar can be submitted to an already running
 Flink cluster, or added to a Flink application container image.
 >
 > Important: For Maven (and other build tools) to correctly package the
 dependencies into the application jar, these application dependencies must
 be specified in scope compile (unlike the core dependencies, which must be
 specified in scope provided).

 ## Hive connector dependencies should be in provided scope

 However, [Flink Hive Integration docs](
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven)
 suggests the opposite:

 > If you are building your own program, you need the following
 dependencies in your mvn file. It’s recommended not to include these
 dependencies in the resulting jar file. You’re supposed to add dependencies
 as stated above at runtime.

 ## Why?

 Thanks!

 Best,
 Yik San

>>>


getMetricGroup is not supported when optimizing

2021-04-09 Thread abc15606

使用flink 1.11.2
启动任务的时候报

getMetricGroup is not supported when optimizing
请问各位大佬,怎么解决好?
发自我的iPhone

Re: Flink 1.13 and CSV (batch) writing

2021-04-09 Thread Flavio Pompermaier
And another thing: in my csv I added ',bye' (to test null Long values) but
I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is
that the right way to solve this problem?

On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier 
wrote:

> Thanks Kurt, now it works. However I can't find a way to skip the CSV
> header..before there was  "format.ignore-first-line" but now I can't find
> another way to skip it.
> I could set csv.ignore-parse-errors to true but then I can't detect other
> parsing errors, otherwise I need to manually transofrm the header into a
> comment adding the # character at the start of the line..
> How can I solve that?
>
> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young  wrote:
>
>> My DDL is:
>>
>> CREATE TABLE csv (
>>id BIGINT,
>>name STRING
>> ) WITH (
>>'connector' = 'filesystem',
>>'path' = '.',
>>'format' = 'csv'
>> );
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:
>>
>>> Hi Flavio,
>>>
>>> We would recommend you to use new table source & sink interfaces, which
>>> have different
>>> property keys compared to the old ones, e.g. 'connector' v.s.
>>> 'connector.type'.
>>>
>>> You can follow the 1.12 doc [1] to define your csv table, everything
>>> should work just fine.
>>>
>>> *Flink SQL> set table.dml-sync=true;*
>>>
>>> *[INFO] Session property has been set.*
>>>
>>>
>>> *Flink SQL> select * from csv;*
>>>
>>> *+--+--+*
>>>
>>> *|   id | name |*
>>>
>>> *+--+--+*
>>>
>>> *|3 |c |*
>>>
>>> *+--+--+*
>>>
>>> *Received a total of 1 row*
>>>
>>>
>>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>>
>>> *[INFO] Submitting SQL update statement to the cluster...*
>>>
>>> *[INFO] Execute statement in sync mode. Please wait for the execution
>>> finish...*
>>>
>>> *[INFO] Complete execution of the SQL update statement.*
>>>
>>>
>>> *Flink SQL> select * from csv;*
>>>
>>> *+--+--+*
>>>
>>> *|   id | name |*
>>>
>>> *+--+--+*
>>>
>>> *|4 |d |*
>>>
>>> *+--+--+*
>>>
>>> *Received a total of 1 row*
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier 
>>> wrote:
>>>
 Hi Till,
 since I was using the same WITH-clause both for reading and writing I
 discovered that overwrite is actually supported in the Sinks, while in the
 Sources an exception is thrown (I was thinking that those properties were
 simply ignored).
 However the quote-character is not supported in the sinks: is this a
 bug or is it the intended behaviour?.
 Here is a minimal example that reproduce the problem (put in the
 /tmp/test.csv something like '1,hello' or '2,hi').

 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;

 public class FlinkCsvTest {
   public static void main(String[] args) throws Exception {
 final EnvironmentSettings envSettings =

 EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 final TableEnvironment tableEnv =
 TableEnvironment.create(envSettings);
 // ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 // BatchTableEnvironment tableEnv =
 BatchTableEnvironment.create(env);
 final String tableInName = "testTableIn";
 final String createInTableDdl = getSourceDdl(tableInName,
 "/tmp/test.csv"); //

 final String tableOutName = "testTableOut";
 final String createOutTableDdl = getSinkDdl(tableOutName,
 "/tmp/test-out.csv"); //
 tableEnv.executeSql(createInTableDdl);
 tableEnv.executeSql(createOutTableDdl);

 Table tableIn = tableEnv.from(tableInName);
 Table tableOut = tableEnv.from(tableOutName);
 tableIn.insertInto(tableOutName);
 // tableEnv.toDataSet(table, Row.class).print();
 tableEnv.execute("TEST read/write");

   }

   private static String getSourceDdl(String tableName, String filePath)
 {
 return "CREATE TABLE " + tableName + " (\n" + //
 " `id` BIGINT,\n" + //
 " `name` STRING) WITH (\n" + //
 " 'connector.type' = 'filesystem',\n" + //
 " 'connector.property-version' = '1',\n" + //
 " 'connector.path' = '" + filePath + "',\n" + //
 " 'format.type' = 'csv',\n" + //
 " 'format.field-delimiter' = ',',\n" + //

Re: Why does flink-quickstart-scala suggests adding connector dependencies in the default scope, while Flink Hive integration docs suggest the opposite

2021-04-09 Thread Till Rohrmann
Hi Yik San,

(1) You could do the same with Kafka. For Hive I believe that the
dependency is simply quite large so that it hurts more if you bundle it
with your user code.

(2) If you change the content in the lib directory, then you have to
restart the cluster.

Cheers,
Till

On Fri, Apr 9, 2021 at 4:02 AM Yik San Chan 
wrote:

> Hi Till, I have 2 follow-ups.
>
> (1) Why is Hive special, while for connectors such as kafka, the docs
> suggest simply bundling the kafka connector dependency with my user code?
>
> (2) it seems the document misses the "before you start the cluster" part -
> does it always require a cluster restart whenever the /lib directory
> changes?
>
> Thanks.
>
> Best,
> Yik San
>
> On Fri, Apr 9, 2021 at 1:07 AM Till Rohrmann  wrote:
>
>> Hi Yik San,
>>
>> for future reference, I copy my answer from the SO here:
>>
>> The reason for this difference is that for Hive it is recommended to
>> start the cluster with the respective Hive dependencies. The documentation
>> [1] states that it's best to put the dependencies into the lib directory
>> before you start the cluster. That way the cluster is enabled to run jobs
>> which use Hive. At the same time, you don't have to bundle this dependency
>> in the user jar which reduces its size. However, there shouldn't be
>> anything preventing you from bundling the Hive dependency with your user
>> code if you want to.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#dependencies
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 8, 2021 at 11:41 AM Yik San Chan 
>> wrote:
>>
>>> The question is cross-posted on Stack Overflow
>>> https://stackoverflow.com/questions/67001326/why-does-flink-quickstart-scala-suggests-adding-connector-dependencies-in-the-de
>>> .
>>>
>>> ## Connector dependencies should be in default scope
>>>
>>> This is what [flink-quickstart-scala](
>>> https://github.com/apache/flink/blob/d12eeedfac6541c3a0711d1580ce3bd68120ca90/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml#L84)
>>> suggests:
>>>
>>> ```
>>> 
>>>
>>> 
>>> ```
>>>
>>> It also aligns with [Flink project configuration](
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies
>>> ):
>>>
>>> > We recommend packaging the application code and all its required
>>> dependencies into one jar-with-dependencies which we refer to as the
>>> application jar. The application jar can be submitted to an already running
>>> Flink cluster, or added to a Flink application container image.
>>> >
>>> > Important: For Maven (and other build tools) to correctly package the
>>> dependencies into the application jar, these application dependencies must
>>> be specified in scope compile (unlike the core dependencies, which must be
>>> specified in scope provided).
>>>
>>> ## Hive connector dependencies should be in provided scope
>>>
>>> However, [Flink Hive Integration docs](
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven)
>>> suggests the opposite:
>>>
>>> > If you are building your own program, you need the following
>>> dependencies in your mvn file. It’s recommended not to include these
>>> dependencies in the resulting jar file. You’re supposed to add dependencies
>>> as stated above at runtime.
>>>
>>> ## Why?
>>>
>>> Thanks!
>>>
>>> Best,
>>> Yik San
>>>
>>


Re: Flink 1.13 and CSV (batch) writing

2021-04-09 Thread Flavio Pompermaier
Thanks Kurt, now it works. However I can't find a way to skip the CSV
header..before there was  "format.ignore-first-line" but now I can't find
another way to skip it.
I could set csv.ignore-parse-errors to true but then I can't detect other
parsing errors, otherwise I need to manually transofrm the header into a
comment adding the # character at the start of the line..
How can I solve that?

On Fri, Apr 9, 2021 at 4:07 AM Kurt Young  wrote:

> My DDL is:
>
> CREATE TABLE csv (
>id BIGINT,
>name STRING
> ) WITH (
>'connector' = 'filesystem',
>'path' = '.',
>'format' = 'csv'
> );
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:
>
>> Hi Flavio,
>>
>> We would recommend you to use new table source & sink interfaces, which
>> have different
>> property keys compared to the old ones, e.g. 'connector' v.s.
>> 'connector.type'.
>>
>> You can follow the 1.12 doc [1] to define your csv table, everything
>> should work just fine.
>>
>> *Flink SQL> set table.dml-sync=true;*
>>
>> *[INFO] Session property has been set.*
>>
>>
>> *Flink SQL> select * from csv;*
>>
>> *+--+--+*
>>
>> *|   id | name |*
>>
>> *+--+--+*
>>
>> *|3 |c |*
>>
>> *+--+--+*
>>
>> *Received a total of 1 row*
>>
>>
>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>
>> *[INFO] Submitting SQL update statement to the cluster...*
>>
>> *[INFO] Execute statement in sync mode. Please wait for the execution
>> finish...*
>>
>> *[INFO] Complete execution of the SQL update statement.*
>>
>>
>> *Flink SQL> select * from csv;*
>>
>> *+--+--+*
>>
>> *|   id | name |*
>>
>> *+--+--+*
>>
>> *|4 |d |*
>>
>> *+--+--+*
>>
>> *Received a total of 1 row*
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier 
>> wrote:
>>
>>> Hi Till,
>>> since I was using the same WITH-clause both for reading and writing I
>>> discovered that overwrite is actually supported in the Sinks, while in the
>>> Sources an exception is thrown (I was thinking that those properties were
>>> simply ignored).
>>> However the quote-character is not supported in the sinks: is this a bug
>>> or is it the intended behaviour?.
>>> Here is a minimal example that reproduce the problem (put in the
>>> /tmp/test.csv something like '1,hello' or '2,hi').
>>>
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.TableEnvironment;
>>>
>>> public class FlinkCsvTest {
>>>   public static void main(String[] args) throws Exception {
>>> final EnvironmentSettings envSettings =
>>>
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>> final TableEnvironment tableEnv =
>>> TableEnvironment.create(envSettings);
>>> // ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>> // BatchTableEnvironment tableEnv =
>>> BatchTableEnvironment.create(env);
>>> final String tableInName = "testTableIn";
>>> final String createInTableDdl = getSourceDdl(tableInName,
>>> "/tmp/test.csv"); //
>>>
>>> final String tableOutName = "testTableOut";
>>> final String createOutTableDdl = getSinkDdl(tableOutName,
>>> "/tmp/test-out.csv"); //
>>> tableEnv.executeSql(createInTableDdl);
>>> tableEnv.executeSql(createOutTableDdl);
>>>
>>> Table tableIn = tableEnv.from(tableInName);
>>> Table tableOut = tableEnv.from(tableOutName);
>>> tableIn.insertInto(tableOutName);
>>> // tableEnv.toDataSet(table, Row.class).print();
>>> tableEnv.execute("TEST read/write");
>>>
>>>   }
>>>
>>>   private static String getSourceDdl(String tableName, String filePath) {
>>> return "CREATE TABLE " + tableName + " (\n" + //
>>> " `id` BIGINT,\n" + //
>>> " `name` STRING) WITH (\n" + //
>>> " 'connector.type' = 'filesystem',\n" + //
>>> " 'connector.property-version' = '1',\n" + //
>>> " 'connector.path' = '" + filePath + "',\n" + //
>>> " 'format.type' = 'csv',\n" + //
>>> " 'format.field-delimiter' = ',',\n" + //
>>>  //   " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>>> " 'format.property-version' = '1',\n" + //
>>> " 'format.quote-character' = '\"',\n" + //
>>> " 'format.ignore-first-line' = 'false'" + //
>>> ")";
>>>   }
>>>
>>>   private static String getSinkDdl(String tableName, String filePath) {
>>> return "CREATE TABLE " + tableName + " (\n" + //
>>>  

Re: SinkFunction与OutputFormat选择哪个?

2021-04-09 Thread Qishang
Hi Luna Wong.

RichOutputFormat 实现的最终是由 Flink 提供的 OutputFormatSinkFunction 再包装成
SinkFunction。 OutputFormatSinkFunction 很早就 Deprecated 了,没有实现
CheckpointedFunction 。
jdbc 的是实现了 RichOutputFormat ,但是最后用 GenericJdbcSinkFunction 包装了一次,
GenericJdbcSinkFunction 实现了 CheckpointedFunction,
刚好最近遇到 https://issues.apache.org/jira/browse/FLINK-20552 ,可以看下这个 BUG 的修复。

1.12 之前 :
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink
https://github.com/apache/flink/blob/release-1.12/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala#L97-L101
最新 1.13
: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L153-L163

Luna Wong  于2021年4月8日周四 下午5:42写道:

> 自定义Connector时,RichSinkFunction和RichOutputFormat可以选择一个进行实现,那么作为开发者应该如何选择呢?
>


Re: Flink Metrics emitted from a Kubernetes Application Cluster

2021-04-09 Thread Chesnay Schepler

This is currently not possible. See also FLINK-8358

On 4/9/2021 4:47 AM, Claude M wrote:

Hello,

I've setup Flink as an Application Cluster in Kubernetes. Now I'm 
looking into monitoring the Flink cluster in Datadog. This is what is 
configured in the flink-conf.yaml to emit metrics:


metrics.scope.jm : flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
metrics.scope.tm : flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator
metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter

metrics.reporter.dghttp.apikey: {{ datadog_api_key }}
metrics.reporter.dghttp.tags: environment: {{ environment }}

When it gets to Datadog though, the metrics for the flink.jobmanager 
and flink.taskmanager is filtered by the host which is the Pod IP.  
However, I would like it to use the pod name.  How can this be 
accomplished?



Thanks





Re: Flink-SQL合并多条记录到Map中

2021-04-09 Thread RL_LEE
我使用UDAF的方式解决了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink消费kafka数据open()方法初始换多次的问题

2021-04-09 Thread Qishang
Hi guoxb.
没有全部代码,我猜你 addSink() 走了两次,调试看下。

guoxb__...@sina.com  于2021年4月9日周五 下午2:36写道:

> hi:
>情景:
> 我在用flink通过FlinkKafkaConsumer消费kafka的数据并写入到mysql的时候,在sink端我继承
> RichSinkFunction ,并重写了open(),close()方法,以及实现了invoke(),方法
> 个人理解:
> 1. open()方法在程序启动的时候只走一次,我在该方法中初始化了数据库连接
> 2. close()方法在程序结束的时候也是只走一次
> 3. invoke()方法在获取到每一条数据走一次这个方法
> 实际情况及问题(env.setParallelism(1)):
> 1. open()在程序启动的时候运行了两次
> 2. invoke()方法在每条消息过来也会被处理两次
>
> code:
> reader端:
> ```java
> public class FlinkKafkaReader extends
> DataKafkaConnect {
>
> @Override
> protected DataStream reader(StreamExecutionEnvironment env,
> KafkaConfig cfg) throws JobException {
>
> DataStream stream = null;
> try {
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("bootstrap.servers",
> cfg.getBootstrapServers());
> kafkaProps.setProperty("group.id", cfg.getGroupId());
> kafkaProps.setProperty("auto.offset.reset",
> cfg.getOffsetReset());
> kafkaProps.setProperty("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaProps.setProperty("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaProps.setProperty("enable.auto.commit",
> cfg.getAutoCommit());
> kafkaProps.setProperty("max.poll.interval.ms",
> cfg.getIntervalMs());
>
> KafkaDeserializationSchema deserializationKdl = null;
> // 根据不同的配置进行选择不同的消息解析器
> switch (cfg.getMessageType()) {
> case "mysql":
> deserializationKdl = new
> KafkaDataDeserialiaztionBinlogSchemal();
> break;
> case "mongodb":
> deserializationKdl = new
> KafkaDataDeserialiaztionOplogSchema();
> break;
> }
> FlinkKafkaConsumer flinkKafkaConsumer = new
> FlinkKafkaConsumer(Arrays.asList(cfg.getTopics().split(",")),
> deserializationKdl, kafkaProps);
>
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> // 如果设置了消费的开始offset时间,则从指定的时间点开始会消费,否则从当前时间点开始消费
> String consumptionStartOffset = cfg.getConsumptionStartTime();
> if (StringUtils.isBlank(consumptionStartOffset)) {
> flinkKafkaConsumer.setStartFromGroupOffsets();
> } else {
> flinkKafkaConsumer.setStartFromTimestamp(
> new SimpleDateFormat("-MM-dd HH:mm:ss")
> .parse(cfg.getConsumptionStartTime())
> .getTime()
> );
> }
> // 设置并行度
> env.setParallelism(1);
> //env.getCheckpointConfig().setCheckpointInterval(1000 * 30);
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> // 设置可容忍checkpoint失败的次数
>
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> stream = env.addSource(flinkKafkaConsumer)
> .filter(new FilterFunction() {
> @Override
> public boolean filter(Object value) throws
> Exception {
> return null != value;
> }
> });
> } catch (Exception e) {
> throw new JobException(e.getMessage());
> }
> return stream;
> }
> }
> ```
> sink端:
> ```java
> public class MysqlSink extends RichSinkFunction {
> @Override
> public void open(Configuration config) throw Exception{
> ...
> }
> @Override
> public void close(){
> ...
> }
> @Override
> public void invoke(Object obj,Context context){
> //业务逻辑,这里的逻辑每一条数据过来会运行两次,这里也是我的问题
> ...
> }
> }
> ```
>
> 还请知悉原因的道友给点指引,万分感谢
>
>
> guoxb__...@sina.com
>


?????? Flink 1.12.2 sql api use parquet format error

2021-04-09 Thread ??????
After change pom.xml, new error:


org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application. at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
 at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: 
org/apache/hadoop/conf/Configuration at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
 ... 7 more Caused by: java.lang.NoClassDefFoundError: 
org/apache/hadoop/conf/Configuration at 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:115)
 at 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:51)
 at 
org.apache.flink.formats.parquet.ParquetFileFormatFactory$2.createRuntimeEncoder(ParquetFileFormatFactory.java:103)
 at 
org.apache.flink.formats.parquet.ParquetFileFormatFactory$2.createRuntimeEncoder(ParquetFileFormatFactory.java:97)
 at 
org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
 at 
org.apache.flink.table.filesystem.FileSystemTableSink.createStreamingSink(FileSystemTableSink.java:183)
 at 
org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:145)
 at 
org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
 at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:103)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
 at com.jd.app.StreamingJob.main(StreamingJob.java:146) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 

Re: Proper way to get DataStream

2021-04-09 Thread Arvid Heise
Hi Maminspapin,

I just answered another question similarly, so let me just c it here:

The beauty of Avro lies in having reader and writer schema and schema
compatibility, such that if your schema evolves over time (which will
happen in streaming naturally but is also very common in batch), you can
still use your application as is without modification. For streaming, this
methodology also implies that you can process elements with different
schema versions in the same run, which is mandatory for any non-toy example.

If you read into this topic, you will realize that it doesn't make sense to
read from Avro without specifying your reader schema (except for some
generic applications, but they should be written in DataStream). If you
keep in mind that your same dataset could have different schemas, you will
notice that your ideas quickly reach some limitations (which schema to
take?). What you could do, is to write a small script to generate the
schema DDL from your current schema in your actual data if you have very
many columns and datasets. It certainly would also be an interesting idea
to pass a static Avro/Json schema to the DDL.

Note that in KafkaStreams, you have the same issue. You usually generate
your Java classes from some schema version, which will become your reader
schema. You can and should do the same in Flink. Please read [1] for more
information.

[1] https://www.baeldung.com/java-apache-avro#read-schema

On Sun, Apr 4, 2021 at 4:21 PM Maminspapin  wrote:

> Hi, @Arvid Heise-4, @Matthias
>
> I'm very appreciate for your attention, guys. And sorry for my late reply.
>
> Yes, Arvid, you are right, the second way in fact works. I coppied schema
> from Schema Registry using it's API and created the .avsc format file. And
> thanks again for explaining me why the first way is not compatible.
>
> So, my code to define schema is (I don't know is it good decision...):
>
> Path path = Paths.get("path_to_schema/schema.avsc");
> String content = new String(Files.readAllBytes(path));
> Schema schema = new Schema.Parser().parse(content);
>
> And it really works.
>
> But, I don't understand why should I use two schemas:
> 1. schema I created (reader schema)
> 2. schema I get with SR url (writer schema)
>
> I have some expirience with KafkaStreams lib and using it there is no need
> to get reader schema. There is one service to communicate with schemas -
> it's Schema Registry. Why not to use single source to get schema in Flink?
>
>
> Again, the second way is correct, and I can to go farther with my program.
>
> Thanks.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Avro schema

2021-04-09 Thread Arvid Heise
Hi Sumeet,

The beauty of Avro lies in having reader and writer schema and schema
compatibility, such that if your schema evolves over time (which will
happen in streaming naturally but is also very common in batch), you can
still use your application as is without modification. For streaming, this
methodology also implies that you can process elements with different
schema versions in the same run, which is mandatory for any non-toy example.

If you read into this topic, you will realize that it doesn't make sense to
read from Avro without specifying your reader schema (except for some
generic applications, but they should be written in DataStream). If you
keep in mind that your same dataset could have different schemas, you will
notice that your ideas quickly reach some limitations (which schema to
take?). What you could do, is to write a small script to generate the
schema DDL from your current schema in your actual data if you have very
many columns and datasets. It certainly would also be an interesting idea
to pass a static Avro/Json schema to the DDL.

On Fri, Apr 2, 2021 at 10:57 AM Paul Lam  wrote:

> Hi Sumeet,
>
> I’m not a Table/SQL API expert, but from my knowledge, it’s not viable to
> derived SQL table schemas from Avro schemas, because table schemas would be
> the ground truth by design.
> Moreover, one Avro type can be mapped to multiple Flink types, so in
> practice maybe it’s also not viable.
>
> Best,
> Paul Lam
>
> 2021年4月2日 11:34,Sumeet Malhotra  写道:
>
> Just realized, my question was probably not clear enough. :-)
>
> I understand that the Avro (or JSON for that matter) format can be
> ingested as described here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#apache-avro-format,
> but this still requires the entire table specification to be written in the
> "CREATE TABLE" section. Is it possible to just specify the Avro schema and
> let Flink map it to an SQL table?
>
> BTW, the above link is titled "Table API Legacy Connectors", so is this
> still supported? Same question for YAML specification.
>
> Thanks,
> Sumeet
>
> On Fri, Apr 2, 2021 at 8:26 AM Sumeet Malhotra 
> wrote:
>
>> Hi,
>>
>> Is it possible to directly import Avro schema while ingesting data into
>> Flink? Or do we always have to specify the entire schema in either SQL DDL
>> for Table API or using DataStream data types? From a code maintenance
>> standpoint, it would be really helpful to keep one source of truth for the
>> schema somewhere.
>>
>> Thanks,
>> Sumeet
>>
>
>


Re: yarn-per-job模式下,sql-client如何指定提交yarn的资源队列

2021-04-09 Thread guanxianchun
-yqu yarn_queue_name



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink消费kafka数据open()方法初始换多次的问题

2021-04-09 Thread guoxb__...@sina.com
hi:
   情景:
我在用flink通过FlinkKafkaConsumer消费kafka的数据并写入到mysql的时候,在sink端我继承 
RichSinkFunction ,并重写了open(),close()方法,以及实现了invoke(),方法
个人理解:
1. open()方法在程序启动的时候只走一次,我在该方法中初始化了数据库连接
2. close()方法在程序结束的时候也是只走一次
3. invoke()方法在获取到每一条数据走一次这个方法   
实际情况及问题(env.setParallelism(1)):
1. open()在程序启动的时候运行了两次
2. invoke()方法在每条消息过来也会被处理两次

code:
reader端:
```java
public class FlinkKafkaReader extends 
DataKafkaConnect {

@Override
protected DataStream reader(StreamExecutionEnvironment env, KafkaConfig 
cfg) throws JobException {

DataStream stream = null;
try {
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", 
cfg.getBootstrapServers());
kafkaProps.setProperty("group.id", cfg.getGroupId());
kafkaProps.setProperty("auto.offset.reset", cfg.getOffsetReset());
kafkaProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty("enable.auto.commit", cfg.getAutoCommit());
kafkaProps.setProperty("max.poll.interval.ms", cfg.getIntervalMs());

KafkaDeserializationSchema deserializationKdl = null;
// 根据不同的配置进行选择不同的消息解析器
switch (cfg.getMessageType()) {
case "mysql":
deserializationKdl = new 
KafkaDataDeserialiaztionBinlogSchemal();
break;
case "mongodb":
deserializationKdl = new 
KafkaDataDeserialiaztionOplogSchema();
break;
}
FlinkKafkaConsumer flinkKafkaConsumer = new 
FlinkKafkaConsumer(Arrays.asList(cfg.getTopics().split(",")), 
deserializationKdl, kafkaProps);


env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 如果设置了消费的开始offset时间,则从指定的时间点开始会消费,否则从当前时间点开始消费
String consumptionStartOffset = cfg.getConsumptionStartTime();
if (StringUtils.isBlank(consumptionStartOffset)) {
flinkKafkaConsumer.setStartFromGroupOffsets();
} else {
flinkKafkaConsumer.setStartFromTimestamp(
new SimpleDateFormat("-MM-dd HH:mm:ss")
.parse(cfg.getConsumptionStartTime())
.getTime()
);
}
// 设置并行度
env.setParallelism(1);
//env.getCheckpointConfig().setCheckpointInterval(1000 * 30);


env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置可容忍checkpoint失败的次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
stream = env.addSource(flinkKafkaConsumer)
.filter(new FilterFunction() {
@Override
public boolean filter(Object value) throws Exception {
return null != value;
}
});
} catch (Exception e) {
throw new JobException(e.getMessage());
}
return stream;
}
}
```
sink端:
```java
public class MysqlSink extends RichSinkFunction {
@Override
public void open(Configuration config) throw Exception{
...
}
@Override
public void close(){
...
}
@Override
public void invoke(Object obj,Context context){
//业务逻辑,这里的逻辑每一条数据过来会运行两次,这里也是我的问题
...
}
}
```
 
还请知悉原因的道友给点指引,万分感谢


guoxb__...@sina.com


Re: flink cdc读取Maxwell格式的binlog,如何获取表名等元信息

2021-04-09 Thread Qishang
Hi chen310.
Canal 和 debezium 已经实现,Maxwell 还没有完成,可以关注下
https://issues.apache.org/jira/browse/FLINK-20926

chen310 <1...@163.com> 于2021年4月8日周四 下午5:35写道:

> 请教下,看了这篇文章https://developer.aliyun.com/article/771438,flink-cdc 读取mysql
> Maxwell 格式binlog,怎么在flink 源表上获取mysql表名,通过这样的方式并没有生效  `origin_table` STRING
> METADATA FROM 'value.source.table' VIRTUAL,
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t572/lALPDg7mQSzBJifNAmzNBpY_1686_620.png>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Organizing Flink Applications: Mono repo or polyrepo

2021-04-09 Thread Arvid Heise
Hi Bin,

I would put Flink applications into separate repos. It reduces compile
times and makes automatic deployment much easier (if you update
master/release branch of application X, you simply deploy it - potentially
with some manual trigger in your CI/CD pipeline) . You can also easily bump
Flink versions for applications that benefit from new features.

If you have a large amount of shared code between your Flink applications
and they have similar life cycles (e.g. a kind of strongly coupled), then
it makes sense to put these specific applications into the same repo. A
smaller amount of shared code should reside in its own repo.

Naturally, if you have one code base for several Flink applications (e.g.
generic application that is configured), then you have only one repo. For
example, you have a generic engine to execute some SQL query on any dataset
and only need a bit of yaml to configure it, you would have one application
with X deployments. Usually, upon updating the generic application, you
want to refresh all deployments.

TL;DR only put in things into the same repo, if they share the life-cycle
to keep deployment simple.

I hope this helps,

Arvid

On Wed, Mar 31, 2021 at 3:49 AM Xinbin Huang  wrote:

> Hi community
>
> I am curious about people's experience in structuring Flink applications.
> Do you use a mono repo structure (multiple applications in one single repo)
> or broken down each application into its own repo?
>
> If possible, can you share some of your thoughts on the pros/cons of each
> approach?
>
> Thanks
> Bin
>


Re: Async + Broadcast?

2021-04-09 Thread Arvid Heise
Hi Alex,

The easiest way to verify if what you tried is working out is to look at
Flink's Web UI and check the topology.

The broadcast side of the input will always be ... well broadcasted (=not
chained). So you need to disable chaining only on the non-broadcasted
dataset.

val parsed: DataStream = dataSource
  .disableChaining()
  .connect(configBroadcast)
  .process(Parse(initialConfigs))

Regarding objectReuse, it's safe to enable if you don't do any dirty hacks
on data that has been output already. So what you cannot do is, store the
last element in your map function (without managed state) and use that to
calculate the new result.

On Fri, Apr 9, 2021 at 1:13 AM Alex Cruise  wrote:

> Thanks Arvid! I'm not completely clear on where to apply your suggestions.
>
> I've included a sketch of my job below, and I have a couple questions:
>
> 1. It looks like enableObjectReuse() is a global setting, should I worry
> about whether I'm using any mutable data between stages?
> 2. Should I disableChaining() on BOTH broadcast-dependent stages, or just
> the one immediately preceding the async?
>
> Thanks!
>
> -0xe1a
>
> *Types:*
>
> /** all the configs for a given tenant, as of the time when a change was
> observed */
> data class ConfigSnapshot(
>   tenantId: Long,
>   timestamp: Instant,
>   configs: Map
> )
>
> /** parse raw strings from input, rejecting those for unconfigured tenants
> */
> class Parse(
>   initialConfigs: Map
> ) : BroadcastProcessFunction {
>   override fun processBroadcastElement(
> value: ConfigSnapshot,
> ctx: Context,
> out: Collector
>   ) {
> val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
> snapshots.put(value.tenantId, value)
>   }
>
>   override fun processElement(value: String, ctx: ReadOnlyContext, out:
> Collector) {
> val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor)
>   .toMap()
>   .keys
>   .ifEmpty { initialConfigs.keys }
>
> val parsed = Record(value)
> if (!validTenantIds.contains(parsed.tenantId)) {
>   return
> } else {
>   out.collect(parsed)
> }
>   }
> }
>
> /** given a parsed record, identity which config(s) are interested in it,
> and send an output value of the record tupled with the interested config */
> class ValidateAndDistribute(
>   initialConfigs: Map
> ) : BroadcastProcessFunction>
> {
>   override fun processBroadcastElement(
> value: ConfigSnapshot,
> ctx: Context,
> out: Collector>
>   ) {
> val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
> snapshots.put(value.tenantId, value)
>   }
>
>   override fun processElement(
> value: Record,
> ctx: ReadOnlyContext,
> out: Collector>
>   ) {
> val configsForThisTenant =
> ctx.getBroadcastState(configSnapshotDescriptor)
>   .toMap()
>   .ifEmpty { initialConfigs }
>   .get(value.tenantId)
>   .configs
>   .orEmpty()
>
> val configsInterestedInThisRecord = configsForThisTenant.values.filter
> {
>   it.interestedIn(value)
> }
>
> for ((configId, config) in configsInterestedInThisRecord) {
>   out.collect(value to config)
> }
>   }
> }
>
> /** given a pair of Record and Config, run the async operation and send an
> enriched record including the result */
> class Enrich() : RichAsyncFunction, EnrichedRecord>
>
> *Job Pseudocode:*
>
> val initialConfigs: Map = ???
> val dataSource: DataStream = ???
> val configSource: DataStream = ??? // from a legacy "while
> (true) { poll; sleep }" source
>
> // the config-subscribing operators keep the broadcast state in a
> Map
> val configSnapshotDescriptor = MapStateDescriptor(
>   "currentConfigSnapshots",
>   Long::class.java,
>   ConfigSnapshot::class.java
> )
>
> // Broadcast the snapshots
> val configBroadcast: BroadcastStream =
> configSource.broadcast(configSnapshotDescriptor)
>
> val parsed: DataStream = dataSource
>   .connect(configBroadcast)
>   .process(Parse(initialConfigs))
>
> // input records can be duplicated now, as there may be multiple Configs
> that are interested in a record
> val validated: DataStream> = parsed
>   .connect(configBroadcast)
>   .process(ValidateAndDistribute(initialConfigs))
>
> val enriched: DataStream = AsyncDataStream.unorderedWait(
>   validated,
>   Enrich(),
>   5L,
>   TimeUnit.SECONDS
> )
>
>
>
>
>
> On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise  wrote:
>
>> Hi Alex,
>>
>> your approach is completely valid. What you want to achieve is that you
>> have a chain between your state managing operator and the consuming async
>> operations. In that way, you have no serialization overhead.
>>
>> To achieve that you want to
>> - use Flink 1.11+ [1]
>> - make sure that if you have a legacy source, you disableChaining before
>> your state managing operator as asyncIO cannot be (transitively) chained to
>> legacy sources. So it should be source -> ... -> (forward channel) ->
>> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
>> -