Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Arvid Heise
Hi Vijay,

if you don't specify a checkpoint, then Flink assumes you want to start
from scratch (e.g., you had a bug in your business logic and need to start
completely without state).

If there is any failure and Flink restarts automatically, it will always
pick up from the latest checkpoint [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#recovery

On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav 
wrote:

> Thanks it was working fine with: bin/flink run  -s
> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
> \
>
> On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav 
> wrote:
>
>> Hi Arvid,
>>
>> Thanks for your response. I did not restart from the checkpoint. I
>> assumed Flink would look for a checkpoint upon restart automatically.
>>
>> *I should restart like below ?*
>>
>> bin/flink run  -s
>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
>> \
>>
>> Thanks,
>> Vijay
>>
>> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:
>>
>>> Hi Vijay,
>>>
>>> edit: After re-reading your message: are you sure that you restart from
>>> a checkpoint/savepoint? If you just start the application anew and use
>>> LATEST initial position, this is the expected bahvior.
>>>
>>> --- original intended answer if you restart from checkpoint
>>>
>>> this is definitively not the expected behavior.
>>>
>>> To exclude certain error sources:
>>> - Could you double-check if this is also happening if you don't use
>>> unaligned checkpoints? (I don't really think this is because of unaligned
>>> checkpoint, but it's better to be sure and we want to reduce the possible
>>> error sources)
>>> - Can you see the missing messages still in Kinesis?
>>> - Could you extract all log INFO statements from
>>> org.apache.flink.streaming.connectors.kinesis and attach them here?
>>> - How long did you wait with recovery?
>>>
>>>
>>>
>>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
>>> wrote:
>>>
 Hi Team,

 We are trying to make sure we are not losing data when KINESIS Consumer
 is down.

 Kinesis streaming Job which has following checkpointing properties:


 *// checkpoint every X msecs
 env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*








 *// enable externalized checkpoints which are retained after job
 cancellation
 env.getCheckpointConfig().enableExternalizedCheckpoints(
 CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
   );// allow job recovery fallback to checkpoint when there is a more
 recent savepoint
 env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
  //
 enables the experimental unaligned checkpoints
 env.getCheckpointConfig().enableUnalignedCheckpoints();*

 *//checkpointpath*
 *env.setStateBackend(new
 FsStateBackend(Conf.getFlinkCheckPointPath(), true));*

 1) We killed the Kinesis Job
 2) Sent messages to KDS while Consumer was down.
 3) Restarted Flink Consumer, *messages which were sent during the
 Consumer down period, never ingested (data loss).*
 4) Re-sent messages to KDS while the consumer was still up. Messages
 did ingest fine.

 *How can I avoid data loss for #3 ??*

 From Logs:


 *2021-04-07 12:15:49,161 INFO
  org.apache.flink.runtime.jobmaster.JobMaster  - Using
 application-defined state backend: File State Backend (checkpoints:
 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
 TRUE, fileStateThreshold: -1)*



 *2021-04-07 12:16:02,343 INFO
  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
 checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
 ms).2021-04-07 12:16:11,951 INFO
  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
 checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
 checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
 ms).*

 Thanks,
 Vijay

>>>


Re: 求问Hive DDL TBLPROPERTIES不生效

2021-04-08 Thread HunterXHunter
你好,
1:我设置的时候就是 使用的 partition-time 同时
设定checkpoint间隔为60s。但是我发现watermark一直没有生成或者更新,导致我的数据一直无法commit。想知道
为什么watermark无法生成。当时使用process-time是没问题的。
2:因为写hive的话会有小文件的问题。所以我使用file sink来设置合并文件和控制文件大小。但是写文件是无法写hive
metastore。所以hive查不出数据。

想知道有什么方法解决hive小文件问题,难道只能T+1做小文件合并吗。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

????????

2021-04-08 Thread ????buaa
??flinkJobmasterJob
 graph??

execution
 graph

Flink Metrics emitted from a Kubernetes Application Cluster

2021-04-08 Thread Claude M
Hello,

I've setup Flink as an Application Cluster in Kubernetes.  Now I'm looking
into monitoring the Flink cluster in Datadog.  This is what is configured
in the flink-conf.yaml to emit metrics:

metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
metrics.scope.tm: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator
metrics.reporter.dghttp.class:
org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: {{ datadog_api_key }}
metrics.reporter.dghttp.tags: environment: {{ environment }}

When it gets to Datadog though, the metrics for the flink.jobmanager and
flink.taskmanager is filtered by the host which is the Pod IP.  However, I
would like it to use the pod name.  How can this be accomplished?


Thanks


how to submit jobs remotely when a rest proxy like nginx is used and REST endpoint is bind to loopback interface?

2021-04-08 Thread Ming Li
Hi,

The flink official document clearly states that "Simple mutual
authentication may be enabled by configuration if authentication of
connections to the REST endpoint is required, but we recommend to deploy a
“side car proxy”: Bind the REST endpoint to the loopback interface (or the
pod-local interface in Kubernetes) and start a REST proxy that
authenticates and forwards the requests to Flink. Examples for proxies that
Flink users have deployed are Envoy Proxy  or NGINX
with MOD_AUTH
."



So I am wondering, in standalone mode when HA is not enabeld, when a rest
proxy like nginx is used, and rest endpoint is bind to the loopback
interface, how should we submit jobs remotely?



ps.

1. sample flink-conf.yaml settings, and nginx settings are as below showing:

rest.bind-port: 9091/rest.bind-address: 127.0.0.1 (this is the port and ip
where the rest endpoint bind itself to in the host where it is started)

rest.port: 9091/rest.address: 127.0.0.1 (this is the port and ip used by
rest clients when submit requests, so basically it should reach the above
rest.bind-port/rest.bind-address)

[image: image.png]

2. I know that we can use curl to request the nginx proxy, which
authenticates and forwards the request to flink, as below showing: curl -v
-u user1:user1 http://10.20.39.43:8080/config (which is the address where
nginx is listening to)

3. I know that  we can submit jobs from the host where job manager is
located, as below showing:

/opt/flink-1.12.2/bin/flink run -m 127.0.0.1:9091
/opt/flink-1.12.2/examples/batch/WordCount.jar --input /tmp/README.txt
--output /tmp/flink.test.txt11  ()

Thanks!
-- 
Best Regards
Michael Li


Re: GenericRowData与BinaryRowData的转换

2021-04-08 Thread Benchao Li
GenericRowData和BinaryRowData都是RowData这个接口的具体实现。
所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。

关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData,
比如典型的就是序列化的时候都会按照BinaryRowData来序列化。

Luna Wong  于2021年4月8日周四 下午7:36写道:

> 我看Kafka Connector源码生成的是GenericRowData,到Jdbc
> Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么?
>


-- 

Best,
Benchao Li


?????? ?????? period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread ??????
I have tried to add 'classloader.parent-first-patterns.additional: 
"ru.yandex.clickhouse" ' to flink-config, but problem still exist.
Is there lightweight way to put clickhouse JDBC driver on Flink lib/ folder?



----
??: 
   "Maciek Pr??chniak"  
  
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code?
 
When we encountered Metaspace leaks recently, in quite a few   cases it 
turned out that the problem was the JDBC driver in user   classloder which 
was registered by DriverManager and caused   classloader leak.
 

 
 
maciek
 
 

 

 
 On 08.04.2021 11:42, ?? wrote:
 
My application program looks like this. Does this 
structure   has some problem?
 
 
 public class StreamingJob {
public static void   main(String[] args) throws 
Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment  
 env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);
 
 
EnvironmentSettings   
bsSettings =   EnvironmentSettings.newInstance().useBlinkPlanner()

.inStreamingMode().build();
StreamTableEnvironment  
 bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
 
 
bsTableEnv.executeSql("CREATE   
TEMPORARY TABLE ");
Table t =   
bsTableEnv.sqlQuery(query);
 
 
DataStreamhttps://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
  
   Best,
   Yangze Guo
  
   Best,
   Yangze Guo
  
  
   On Tue, Apr 6, 2021 at 4:22 PM ?? 
<495635...@qq.com wrote:
   
batch job??
read data from s3 by sql??then by some 
operators and write data to clickhouse and kafka.
after some times, task-manager quit with
 OutOfMemoryError: Metaspace.
   
env??
flink version??1.12.2
task-manager slot count: 5
deployment?? standalone kubernetes session 
dependencies??
   
 

Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Kurt Young
My DDL is:

CREATE TABLE csv (
   id BIGINT,
   name STRING
) WITH (
   'connector' = 'filesystem',
   'path' = '.',
   'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:

> Hi Flavio,
>
> We would recommend you to use new table source & sink interfaces, which
> have different
> property keys compared to the old ones, e.g. 'connector' v.s.
> 'connector.type'.
>
> You can follow the 1.12 doc [1] to define your csv table, everything
> should work just fine.
>
> *Flink SQL> set table.dml-sync=true;*
>
> *[INFO] Session property has been set.*
>
>
> *Flink SQL> select * from csv;*
>
> *+--+--+*
>
> *|   id | name |*
>
> *+--+--+*
>
> *|3 |c |*
>
> *+--+--+*
>
> *Received a total of 1 row*
>
>
> *Flink SQL> insert overwrite csv values(4, 'd');*
>
> *[INFO] Submitting SQL update statement to the cluster...*
>
> *[INFO] Execute statement in sync mode. Please wait for the execution
> finish...*
>
> *[INFO] Complete execution of the SQL update statement.*
>
>
> *Flink SQL> select * from csv;*
>
> *+--+--+*
>
> *|   id | name |*
>
> *+--+--+*
>
> *|4 |d |*
>
> *+--+--+*
>
> *Received a total of 1 row*
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier 
> wrote:
>
>> Hi Till,
>> since I was using the same WITH-clause both for reading and writing I
>> discovered that overwrite is actually supported in the Sinks, while in the
>> Sources an exception is thrown (I was thinking that those properties were
>> simply ignored).
>> However the quote-character is not supported in the sinks: is this a bug
>> or is it the intended behaviour?.
>> Here is a minimal example that reproduce the problem (put in the
>> /tmp/test.csv something like '1,hello' or '2,hi').
>>
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableEnvironment;
>>
>> public class FlinkCsvTest {
>>   public static void main(String[] args) throws Exception {
>> final EnvironmentSettings envSettings =
>>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> final TableEnvironment tableEnv =
>> TableEnvironment.create(envSettings);
>> // ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
>> final String tableInName = "testTableIn";
>> final String createInTableDdl = getSourceDdl(tableInName,
>> "/tmp/test.csv"); //
>>
>> final String tableOutName = "testTableOut";
>> final String createOutTableDdl = getSinkDdl(tableOutName,
>> "/tmp/test-out.csv"); //
>> tableEnv.executeSql(createInTableDdl);
>> tableEnv.executeSql(createOutTableDdl);
>>
>> Table tableIn = tableEnv.from(tableInName);
>> Table tableOut = tableEnv.from(tableOutName);
>> tableIn.insertInto(tableOutName);
>> // tableEnv.toDataSet(table, Row.class).print();
>> tableEnv.execute("TEST read/write");
>>
>>   }
>>
>>   private static String getSourceDdl(String tableName, String filePath) {
>> return "CREATE TABLE " + tableName + " (\n" + //
>> " `id` BIGINT,\n" + //
>> " `name` STRING) WITH (\n" + //
>> " 'connector.type' = 'filesystem',\n" + //
>> " 'connector.property-version' = '1',\n" + //
>> " 'connector.path' = '" + filePath + "',\n" + //
>> " 'format.type' = 'csv',\n" + //
>> " 'format.field-delimiter' = ',',\n" + //
>>  //   " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>> " 'format.property-version' = '1',\n" + //
>> " 'format.quote-character' = '\"',\n" + //
>> " 'format.ignore-first-line' = 'false'" + //
>> ")";
>>   }
>>
>>   private static String getSinkDdl(String tableName, String filePath) {
>> return "CREATE TABLE " + tableName + " (\n" + //
>> " `id` BIGINT,\n" + //
>> " `name` STRING) WITH (\n" + //
>> " 'connector.type' = 'filesystem',\n" + //
>> " 'connector.property-version' = '1',\n" + //
>> " 'connector.path' = '" + filePath + "',\n" + //
>> " 'format.type' = 'csv',\n" + //
>> " 'format.field-delimiter' = ',',\n" + //
>> " 'format.num-files' = '1',\n" + //
>> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks
>> only)
>> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>> " 'format.property-version' = '1'\n" + //
>> ")";

Re: Why does flink-quickstart-scala suggests adding connector dependencies in the default scope, while Flink Hive integration docs suggest the opposite

2021-04-08 Thread Yik San Chan
Hi Till, I have 2 follow-ups.

(1) Why is Hive special, while for connectors such as kafka, the docs
suggest simply bundling the kafka connector dependency with my user code?

(2) it seems the document misses the "before you start the cluster" part -
does it always require a cluster restart whenever the /lib directory
changes?

Thanks.

Best,
Yik San

On Fri, Apr 9, 2021 at 1:07 AM Till Rohrmann  wrote:

> Hi Yik San,
>
> for future reference, I copy my answer from the SO here:
>
> The reason for this difference is that for Hive it is recommended to start
> the cluster with the respective Hive dependencies. The documentation [1]
> states that it's best to put the dependencies into the lib directory before
> you start the cluster. That way the cluster is enabled to run jobs which
> use Hive. At the same time, you don't have to bundle this dependency in the
> user jar which reduces its size. However, there shouldn't be anything
> preventing you from bundling the Hive dependency with your user code if you
> want to.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#dependencies
>
> Cheers,
> Till
>
> On Thu, Apr 8, 2021 at 11:41 AM Yik San Chan 
> wrote:
>
>> The question is cross-posted on Stack Overflow
>> https://stackoverflow.com/questions/67001326/why-does-flink-quickstart-scala-suggests-adding-connector-dependencies-in-the-de
>> .
>>
>> ## Connector dependencies should be in default scope
>>
>> This is what [flink-quickstart-scala](
>> https://github.com/apache/flink/blob/d12eeedfac6541c3a0711d1580ce3bd68120ca90/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml#L84)
>> suggests:
>>
>> ```
>> 
>>
>> 
>> ```
>>
>> It also aligns with [Flink project configuration](
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies
>> ):
>>
>> > We recommend packaging the application code and all its required
>> dependencies into one jar-with-dependencies which we refer to as the
>> application jar. The application jar can be submitted to an already running
>> Flink cluster, or added to a Flink application container image.
>> >
>> > Important: For Maven (and other build tools) to correctly package the
>> dependencies into the application jar, these application dependencies must
>> be specified in scope compile (unlike the core dependencies, which must be
>> specified in scope provided).
>>
>> ## Hive connector dependencies should be in provided scope
>>
>> However, [Flink Hive Integration docs](
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven)
>> suggests the opposite:
>>
>> > If you are building your own program, you need the following
>> dependencies in your mvn file. It’s recommended not to include these
>> dependencies in the resulting jar file. You’re supposed to add dependencies
>> as stated above at runtime.
>>
>> ## Why?
>>
>> Thanks!
>>
>> Best,
>> Yik San
>>
>


Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Kurt Young
Hi Flavio,

We would recommend you to use new table source & sink interfaces, which
have different
property keys compared to the old ones, e.g. 'connector' v.s.
'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should
work just fine.

*Flink SQL> set table.dml-sync=true;*

*[INFO] Session property has been set.*


*Flink SQL> select * from csv;*

*+--+--+*

*|   id | name |*

*+--+--+*

*|3 |c |*

*+--+--+*

*Received a total of 1 row*


*Flink SQL> insert overwrite csv values(4, 'd');*

*[INFO] Submitting SQL update statement to the cluster...*

*[INFO] Execute statement in sync mode. Please wait for the execution
finish...*

*[INFO] Complete execution of the SQL update statement.*


*Flink SQL> select * from csv;*

*+--+--+*

*|   id | name |*

*+--+--+*

*|4 |d |*

*+--+--+*

*Received a total of 1 row*

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html

Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier 
wrote:

> Hi Till,
> since I was using the same WITH-clause both for reading and writing I
> discovered that overwrite is actually supported in the Sinks, while in the
> Sources an exception is thrown (I was thinking that those properties were
> simply ignored).
> However the quote-character is not supported in the sinks: is this a bug
> or is it the intended behaviour?.
> Here is a minimal example that reproduce the problem (put in the
> /tmp/test.csv something like '1,hello' or '2,hi').
>
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
>
> public class FlinkCsvTest {
>   public static void main(String[] args) throws Exception {
> final EnvironmentSettings envSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
> // ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
> final String tableInName = "testTableIn";
> final String createInTableDdl = getSourceDdl(tableInName,
> "/tmp/test.csv"); //
>
> final String tableOutName = "testTableOut";
> final String createOutTableDdl = getSinkDdl(tableOutName,
> "/tmp/test-out.csv"); //
> tableEnv.executeSql(createInTableDdl);
> tableEnv.executeSql(createOutTableDdl);
>
> Table tableIn = tableEnv.from(tableInName);
> Table tableOut = tableEnv.from(tableOutName);
> tableIn.insertInto(tableOutName);
> // tableEnv.toDataSet(table, Row.class).print();
> tableEnv.execute("TEST read/write");
>
>   }
>
>   private static String getSourceDdl(String tableName, String filePath) {
> return "CREATE TABLE " + tableName + " (\n" + //
> " `id` BIGINT,\n" + //
> " `name` STRING) WITH (\n" + //
> " 'connector.type' = 'filesystem',\n" + //
> " 'connector.property-version' = '1',\n" + //
> " 'connector.path' = '" + filePath + "',\n" + //
> " 'format.type' = 'csv',\n" + //
> " 'format.field-delimiter' = ',',\n" + //
>  //   " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
> " 'format.property-version' = '1',\n" + //
> " 'format.quote-character' = '\"',\n" + //
> " 'format.ignore-first-line' = 'false'" + //
> ")";
>   }
>
>   private static String getSinkDdl(String tableName, String filePath) {
> return "CREATE TABLE " + tableName + " (\n" + //
> " `id` BIGINT,\n" + //
> " `name` STRING) WITH (\n" + //
> " 'connector.type' = 'filesystem',\n" + //
> " 'connector.property-version' = '1',\n" + //
> " 'connector.path' = '" + filePath + "',\n" + //
> " 'format.type' = 'csv',\n" + //
> " 'format.field-delimiter' = ',',\n" + //
> " 'format.num-files' = '1',\n" + //
> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
> " 'format.property-version' = '1'\n" + //
> ")";
>   }
> }
>
> Thanks for the support,
> Flavio
>
>
> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann  wrote:
>
>> Hi Flavio,
>>
>> I tried to execute the code snippet you have provided and I could not
>> reproduce the problem.
>>
>> Concretely I am running this code:
>>
>> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> 

读取两个没有关联关系的文件,使用Flink不知道怎么表达

2021-04-08 Thread ggc

HI,

先读取文件A,每读取一行数据后, 拿这条数据去另外一个文件B中查找与之匹配的行(有匹配规则),请问这种场景使用Flink 怎么表达?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Async + Broadcast?

2021-04-08 Thread Alex Cruise
Thanks Arvid! I'm not completely clear on where to apply your suggestions.

I've included a sketch of my job below, and I have a couple questions:

1. It looks like enableObjectReuse() is a global setting, should I worry
about whether I'm using any mutable data between stages?
2. Should I disableChaining() on BOTH broadcast-dependent stages, or just
the one immediately preceding the async?

Thanks!

-0xe1a

*Types:*

/** all the configs for a given tenant, as of the time when a change was
observed */
data class ConfigSnapshot(
  tenantId: Long,
  timestamp: Instant,
  configs: Map
)

/** parse raw strings from input, rejecting those for unconfigured tenants
*/
class Parse(
  initialConfigs: Map
) : BroadcastProcessFunction {
  override fun processBroadcastElement(
value: ConfigSnapshot,
ctx: Context,
out: Collector
  ) {
val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
snapshots.put(value.tenantId, value)
  }

  override fun processElement(value: String, ctx: ReadOnlyContext, out:
Collector) {
val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor)
  .toMap()
  .keys
  .ifEmpty { initialConfigs.keys }

val parsed = Record(value)
if (!validTenantIds.contains(parsed.tenantId)) {
  return
} else {
  out.collect(parsed)
}
  }
}

/** given a parsed record, identity which config(s) are interested in it,
and send an output value of the record tupled with the interested config */
class ValidateAndDistribute(
  initialConfigs: Map
) : BroadcastProcessFunction> {
  override fun processBroadcastElement(
value: ConfigSnapshot,
ctx: Context,
out: Collector>
  ) {
val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
snapshots.put(value.tenantId, value)
  }

  override fun processElement(
value: Record,
ctx: ReadOnlyContext,
out: Collector>
  ) {
val configsForThisTenant =
ctx.getBroadcastState(configSnapshotDescriptor)
  .toMap()
  .ifEmpty { initialConfigs }
  .get(value.tenantId)
  .configs
  .orEmpty()

val configsInterestedInThisRecord = configsForThisTenant.values.filter
{
  it.interestedIn(value)
}

for ((configId, config) in configsInterestedInThisRecord) {
  out.collect(value to config)
}
  }
}

/** given a pair of Record and Config, run the async operation and send an
enriched record including the result */
class Enrich() : RichAsyncFunction, EnrichedRecord>

*Job Pseudocode:*

val initialConfigs: Map = ???
val dataSource: DataStream = ???
val configSource: DataStream = ??? // from a legacy "while
(true) { poll; sleep }" source

// the config-subscribing operators keep the broadcast state in a
Map
val configSnapshotDescriptor = MapStateDescriptor(
  "currentConfigSnapshots",
  Long::class.java,
  ConfigSnapshot::class.java
)

// Broadcast the snapshots
val configBroadcast: BroadcastStream =
configSource.broadcast(configSnapshotDescriptor)

val parsed: DataStream = dataSource
  .connect(configBroadcast)
  .process(Parse(initialConfigs))

// input records can be duplicated now, as there may be multiple Configs
that are interested in a record
val validated: DataStream> = parsed
  .connect(configBroadcast)
  .process(ValidateAndDistribute(initialConfigs))

val enriched: DataStream = AsyncDataStream.unorderedWait(
  validated,
  Enrich(),
  5L,
  TimeUnit.SECONDS
)





On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise  wrote:

> Hi Alex,
>
> your approach is completely valid. What you want to achieve is that you
> have a chain between your state managing operator and the consuming async
> operations. In that way, you have no serialization overhead.
>
> To achieve that you want to
> - use Flink 1.11+ [1]
> - make sure that if you have a legacy source, you disableChaining before
> your state managing operator as asyncIO cannot be (transitively) chained to
> legacy sources. So it should be source -> ... -> (forward channel) ->
> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
> - enableObjectReuse [2] to avoid copying of objects
>
> [1] https://issues.apache.org/jira/browse/FLINK-16219
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html
>
> On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise  wrote:
>
>> Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)
>>
>> -0xe1a
>>
>> On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey Alex,
>>>
>>> I'm not sure if there is a best practice here, but what I can tell you
>>> is that I worked on a job that did exactly what you're suggesting with a
>>> non-async operator to create a [record, config] tuple, which was then
>>> passed to the async stage. Our config objects were also not tiny (~500kb)
>>> and our pipeline not huge (~1M records/day and 1GB data/ day), but this
>>> setup worked quite well. I'd say if latency isn't your most important
>>> metric, or if your pipeline is a 

Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Vijayendra Yadav
Thanks it was working fine with: bin/flink run  -s
s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
\

On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav 
wrote:

> Hi Arvid,
>
> Thanks for your response. I did not restart from the checkpoint. I assumed
> Flink would look for a checkpoint upon restart automatically.
>
> *I should restart like below ?*
>
> bin/flink run  -s
> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
> \
>
> Thanks,
> Vijay
>
> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:
>
>> Hi Vijay,
>>
>> edit: After re-reading your message: are you sure that you restart from a
>> checkpoint/savepoint? If you just start the application anew and use LATEST
>> initial position, this is the expected bahvior.
>>
>> --- original intended answer if you restart from checkpoint
>>
>> this is definitively not the expected behavior.
>>
>> To exclude certain error sources:
>> - Could you double-check if this is also happening if you don't use
>> unaligned checkpoints? (I don't really think this is because of unaligned
>> checkpoint, but it's better to be sure and we want to reduce the possible
>> error sources)
>> - Can you see the missing messages still in Kinesis?
>> - Could you extract all log INFO statements from
>> org.apache.flink.streaming.connectors.kinesis and attach them here?
>> - How long did you wait with recovery?
>>
>>
>>
>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Team,
>>>
>>> We are trying to make sure we are not losing data when KINESIS Consumer
>>> is down.
>>>
>>> Kinesis streaming Job which has following checkpointing properties:
>>>
>>>
>>> *// checkpoint every X msecs
>>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *// enable externalized checkpoints which are retained after job
>>> cancellation
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(
>>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>>>   );// allow job recovery fallback to checkpoint when there is a more
>>> recent savepoint
>>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>>>  //
>>> enables the experimental unaligned checkpoints
>>> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>>>
>>> *//checkpointpath*
>>> *env.setStateBackend(new
>>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>>>
>>> 1) We killed the Kinesis Job
>>> 2) Sent messages to KDS while Consumer was down.
>>> 3) Restarted Flink Consumer, *messages which were sent during the
>>> Consumer down period, never ingested (data loss).*
>>> 4) Re-sent messages to KDS while the consumer was still up. Messages did
>>> ingest fine.
>>>
>>> *How can I avoid data loss for #3 ??*
>>>
>>> From Logs:
>>>
>>>
>>> *2021-04-07 12:15:49,161 INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster  - Using
>>> application-defined state backend: File State Backend (checkpoints:
>>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
>>> TRUE, fileStateThreshold: -1)*
>>>
>>>
>>>
>>> *2021-04-07 12:16:02,343 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
>>> ms).2021-04-07 12:16:11,951 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
>>> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
>>> ms).*
>>>
>>> Thanks,
>>> Vijay
>>>
>>


Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread Maciek Próchniak

Hi,

Did you put the clickhouse JDBC driver on Flink main classpath (in lib 
folder) and not in user-jar - as described here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code?


When we encountered Metaspace leaks recently, in quite a few cases it 
turned out that the problem was the JDBC driver in user classloder which 
was registered by DriverManager and caused classloader leak.



maciek


On 08.04.2021 11:42, ?? wrote:
My application program looks like this. Does this structure has some 
problem?


public class StreamingJob {
public static void main(String[] args) throws Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner()

.inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
bsSettings);


bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
Table t = bsTableEnv.sqlQuery(query);

DataStream points = bsTableEnv.toAppendStream(t, 
DataPoint.class);


DataStream weightPoints = points.map();

DataStream predictPoints = weightPoints.keyBy()
.reduce().map();

// side output
final OutputTag outPutPredict = new 
OutputTag("predict") {

};

SingleOutputStreamOperator mainDataStream = predictPoints
.process();

DataStream exStream = 
mainDataStream.getSideOutput(outPutPredict);


?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 
?0?2 ?0?2 ?0?2 ?0?2 //write data to clickhouse
String insertIntoCKSql = "xxx";
mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)

.withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));

// write data to kafka
FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
exStream.map().addSink(producer);

env.execute("Prediction Program");
} catch (Exception e) {
e.printStackTrace();
}
i++;
Thread.sleep(window * 1000);
}
}
}



--?0?2?0?2--
*??:* "Arvid Heise" ;
*:*?0?22021??4??8??(??) 2:33
*??:*?0?2"Yangze Guo";
*:*?0?2"??"<495635...@qq.com>;"user";"guowei.mgw";"renqschn";
*:*?0?2Re: period batch job lead to OutOfMemoryError: Metaspace problem

Hi,

ChildFirstClassLoader are created (more or less) by application jar 
and seeing so many looks like a classloader leak to me. I'd expect you 
to see a new ChildFirstClassLoader popping up with each new job 
submission.


Can you check who is referencing the ChildFirstClassLoader 
transitively? Usually, it's some thread that is lingering around 
because some third party library is leaking threads etc.


OneInputStreamTask is legit and just indicates that you have a job 
running with 4 slots on that TM. It should not hold any dedicated 
metaspace memory.


On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo > wrote:


I went through the JM & TM logs but could not find any valuable clue.
The exception is actually thrown by kafka-producer-network-thread.
Maybe @Qingsheng could also take a look?


Best,
Yangze Guo

On Thu, Apr 8, 2021 at 10:39 AM ?? <495635...@qq.com
> wrote:
>
> I have configured to 512M, but problem still exist. Now the
memory size is still 256M.
> Attachments are TM and JM logs.
>
> Look forward to your reply.
>
> --  --
> ??: "Yangze Guo" mailto:karma...@gmail.com>>;
> : 2021??4??6??(??) 6:35
> ??: "??"<495635...@qq.com >;
> : "user"mailto:user@flink.apache.org>>;"guowei.mgw"mailto:guowei@gmail.com>>;
> : Re: period batch job lead to OutOfMemoryError: Metaspace
problem
>
> > I have tried this method, but the problem still exist.
> How much memory do you configure for it?
>
> > is 21 instances of
"org.apache.flink.util.ChildFirstClassLoader" normal
> Not quite sure about it. AFAIK, each job will have a classloader.
> Multiple tasks of the same job in the same TM will share the same
> classloader. The classloader will be removed if there is no more
task
> running on the TM. Classloader without reference will be finally
> cleanup by GC. Could you share JM and TM logs for further analysis?
> I'll also involve @Guowei Ma in this thread.
>
>
> Best,
> Yangze Guo
>
> On Tue, Apr 6, 2021 at 6:05 PM ?? <495635...@qq.com
> wrote:
> >
> > I have tried this method, but the problem still 

Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Flavio Pompermaier
Hi Till,
since I was using the same WITH-clause both for reading and writing I
discovered that overwrite is actually supported in the Sinks, while in the
Sources an exception is thrown (I was thinking that those properties were
simply ignored).
However the quote-character is not supported in the sinks: is this a bug or
is it the intended behaviour?.
Here is a minimal example that reproduce the problem (put in the
/tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
final EnvironmentSettings envSettings =

EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
// ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
// BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
final String tableInName = "testTableIn";
final String createInTableDdl = getSourceDdl(tableInName,
"/tmp/test.csv"); //

final String tableOutName = "testTableOut";
final String createOutTableDdl = getSinkDdl(tableOutName,
"/tmp/test-out.csv"); //
tableEnv.executeSql(createInTableDdl);
tableEnv.executeSql(createOutTableDdl);

Table tableIn = tableEnv.from(tableInName);
Table tableOut = tableEnv.from(tableOutName);
tableIn.insertInto(tableOutName);
// tableEnv.toDataSet(table, Row.class).print();
tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
return "CREATE TABLE " + tableName + " (\n" + //
" `id` BIGINT,\n" + //
" `name` STRING) WITH (\n" + //
" 'connector.type' = 'filesystem',\n" + //
" 'connector.property-version' = '1',\n" + //
" 'connector.path' = '" + filePath + "',\n" + //
" 'format.type' = 'csv',\n" + //
" 'format.field-delimiter' = ',',\n" + //
 //   " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
" 'format.property-version' = '1',\n" + //
" 'format.quote-character' = '\"',\n" + //
" 'format.ignore-first-line' = 'false'" + //
")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
return "CREATE TABLE " + tableName + " (\n" + //
" `id` BIGINT,\n" + //
" `name` STRING) WITH (\n" + //
" 'connector.type' = 'filesystem',\n" + //
" 'connector.property-version' = '1',\n" + //
" 'connector.path' = '" + filePath + "',\n" + //
" 'format.type' = 'csv',\n" + //
" 'format.field-delimiter' = ',',\n" + //
" 'format.num-files' = '1',\n" + //
" 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
" 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
" 'format.property-version' = '1'\n" + //
")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann  wrote:

> Hi Flavio,
>
> I tried to execute the code snippet you have provided and I could not
> reproduce the problem.
>
> Concretely I am running this code:
>
> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>
> tableEnv.fromValues("foobar").execute().await();
>
> Am I missing something? Maybe you can share a minimal but fully working
> example where the problem occurs. Thanks a lot.
>
> Cheers,
> Till
>
> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier 
> wrote:
>
>> Any help here? Moreover if I use the DataStream APIs there's no
>> left/right outer join yet..are those meant to be added in Flink 1.13 or
>> 1.14?
>>
>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> I'm testing writing to a CSV using Flink 1.13 and I get the following
>>> error:
>>>
>>> The matching candidates:
>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>> Unsupported property keys:
>>> format.quote-character
>>>
>>> I create the table env using this:
>>>
>>> final EnvironmentSettings envSettings =
>>> EnvironmentSettings.newInstance()//
>>> .useBlinkPlanner()//
>>> // .inBatchMode()//
>>> .inStreamingMode()//
>>> .build();
>>> final TableEnvironment tableEnv =
>>> TableEnvironment.create(envSettings);
>>>
>>> The error is the same both with inBatchMode and inStreamingMode.
>>> Is this really not supported or am I using the wrong API?
>>>
>>> Best,
>>> Flavio
>>>
>>


Re: Flink does not cleanup some disk memory after submitting jar over rest

2021-04-08 Thread Maciek Próchniak

Hi,

don't know if this is the problem you're facing, but some time ago we 
encountered two issues connected to REST API and increased disk usage 
after each submission:


https://issues.apache.org/jira/browse/FLINK-21164

https://issues.apache.org/jira/browse/FLINK-9844

- they're closed ATM, but only 1.12.2 contains the fixes.


maciek


On 08.04.2021 19:52, Great Info wrote:


I have deployed my own flink setup in AWS ECS. One Service for 
JobManager and one Service for task Managers. I am running one ECS 
task for a job manager and 3 ecs tasks for TASK managers.


I have a kind of batch job which I upload using flink rest every-day 
with changing new arguments, when I submit each time disk memory gets 
increased by ~ 600MB, I have given a checkpoint as S3 . Also I have 
set *historyserver.archive.clean-expired-jobs* true .


Since I am running on ECS, I am not able to find why the memory is 
getting increased on every jar upload and execution .


What are the flink config params I should look at to make sure the 
memory is not shooting up?




Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Vijayendra Yadav
Hi Arvid,

Thanks for your response. I did not restart from the checkpoint. I assumed
Flink would look for a checkpoint upon restart automatically.

*I should restart like below ?*

bin/flink run  -s
s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
\

Thanks,
Vijay

On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:

> Hi Vijay,
>
> edit: After re-reading your message: are you sure that you restart from a
> checkpoint/savepoint? If you just start the application anew and use LATEST
> initial position, this is the expected bahvior.
>
> --- original intended answer if you restart from checkpoint
>
> this is definitively not the expected behavior.
>
> To exclude certain error sources:
> - Could you double-check if this is also happening if you don't use
> unaligned checkpoints? (I don't really think this is because of unaligned
> checkpoint, but it's better to be sure and we want to reduce the possible
> error sources)
> - Can you see the missing messages still in Kinesis?
> - Could you extract all log INFO statements from
> org.apache.flink.streaming.connectors.kinesis and attach them here?
> - How long did you wait with recovery?
>
>
>
> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
> wrote:
>
>> Hi Team,
>>
>> We are trying to make sure we are not losing data when KINESIS Consumer
>> is down.
>>
>> Kinesis streaming Job which has following checkpointing properties:
>>
>>
>> *// checkpoint every X msecs
>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>>
>>
>>
>>
>>
>>
>>
>>
>> *// enable externalized checkpoints which are retained after job
>> cancellation
>> env.getCheckpointConfig().enableExternalizedCheckpoints(
>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>>   );// allow job recovery fallback to checkpoint when there is a more
>> recent savepoint
>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>>  //
>> enables the experimental unaligned checkpoints
>> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>>
>> *//checkpointpath*
>> *env.setStateBackend(new
>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>>
>> 1) We killed the Kinesis Job
>> 2) Sent messages to KDS while Consumer was down.
>> 3) Restarted Flink Consumer, *messages which were sent during the
>> Consumer down period, never ingested (data loss).*
>> 4) Re-sent messages to KDS while the consumer was still up. Messages did
>> ingest fine.
>>
>> *How can I avoid data loss for #3 ??*
>>
>> From Logs:
>>
>>
>> *2021-04-07 12:15:49,161 INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster  - Using
>> application-defined state backend: File State Backend (checkpoints:
>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
>> TRUE, fileStateThreshold: -1)*
>>
>>
>>
>> *2021-04-07 12:16:02,343 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
>> ms).2021-04-07 12:16:11,951 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
>> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
>> ms).*
>>
>> Thanks,
>> Vijay
>>
>


Re: Compression with rocksdb backed state

2021-04-08 Thread deepthi Sridharan
Thank you, that makes sense.

On Thu, Apr 8, 2021 at 12:37 AM Timo Walther  wrote:

> Hi Deepthi,
>
> 1. Correct
> 2. Correct
> 3. Incremental snapshots simply manage references to RocksDB's sstables.
> You can find a full explanation here [1]. Thus, the payload is a
> blackbox for Flink and Flink's compression flag has no impact. So we
> fully rely what RocksDB offers.
> 4. Correct
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> [1]
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>
>
> On 07.04.21 22:04, deepthi Sridharan wrote:
> > I am trying to understand this section on compression of checkpoints
> > which has me a bit confused
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
> > <
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
> >
> >
> > Could you please confirm if my understanding is correct:
> > 1. Compression is disabled by default for full snapshots and needs to be
> > turned on if required.
> > 2. Current versions do not support changing the compression type (snappy
> > by default) when enabled. Future versions will support configuring the
> > compression type.
> > 3. Compression is enabled by default for incremental snapshot.
> > 4. The compression type cannot be configured for incremental savepoints.
> >
> > --
> > Regards,
> > Deepthi
>
>

-- 
Regards,
Deepthi


Flink does not cleanup some disk memory after submitting jar over rest

2021-04-08 Thread Great Info
I have deployed my own flink setup in AWS ECS. One Service for JobManager
and one Service for task Managers. I am running one ECS task for a job
manager and 3 ecs tasks for TASK managers.

I have a kind of batch job which I upload using flink rest every-day with
changing new arguments, when I submit each time disk memory gets increased
by ~ 600MB, I have given a checkpoint as S3 . Also I have set
*historyserver.archive.clean-expired-jobs* true .

Since I am running on ECS, I am not able to find why the memory is getting
increased on every jar upload and execution .

What are the flink config params I should look at to make sure the memory
is not shooting up?


Re: Why does flink-quickstart-scala suggests adding connector dependencies in the default scope, while Flink Hive integration docs suggest the opposite

2021-04-08 Thread Till Rohrmann
Hi Yik San,

for future reference, I copy my answer from the SO here:

The reason for this difference is that for Hive it is recommended to start
the cluster with the respective Hive dependencies. The documentation [1]
states that it's best to put the dependencies into the lib directory before
you start the cluster. That way the cluster is enabled to run jobs which
use Hive. At the same time, you don't have to bundle this dependency in the
user jar which reduces its size. However, there shouldn't be anything
preventing you from bundling the Hive dependency with your user code if you
want to.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#dependencies

Cheers,
Till

On Thu, Apr 8, 2021 at 11:41 AM Yik San Chan 
wrote:

> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/67001326/why-does-flink-quickstart-scala-suggests-adding-connector-dependencies-in-the-de
> .
>
> ## Connector dependencies should be in default scope
>
> This is what [flink-quickstart-scala](
> https://github.com/apache/flink/blob/d12eeedfac6541c3a0711d1580ce3bd68120ca90/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml#L84)
> suggests:
>
> ```
> 
>
> 
> ```
>
> It also aligns with [Flink project configuration](
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies
> ):
>
> > We recommend packaging the application code and all its required
> dependencies into one jar-with-dependencies which we refer to as the
> application jar. The application jar can be submitted to an already running
> Flink cluster, or added to a Flink application container image.
> >
> > Important: For Maven (and other build tools) to correctly package the
> dependencies into the application jar, these application dependencies must
> be specified in scope compile (unlike the core dependencies, which must be
> specified in scope provided).
>
> ## Hive connector dependencies should be in provided scope
>
> However, [Flink Hive Integration docs](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven)
> suggests the opposite:
>
> > If you are building your own program, you need the following
> dependencies in your mvn file. It’s recommended not to include these
> dependencies in the resulting jar file. You’re supposed to add dependencies
> as stated above at runtime.
>
> ## Why?
>
> Thanks!
>
> Best,
> Yik San
>


Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Till Rohrmann
Hi Flavio,

I tried to execute the code snippet you have provided and I could not
reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working
example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier 
wrote:

> Any help here? Moreover if I use the DataStream APIs there's no left/right
> outer join yet..are those meant to be added in Flink 1.13 or 1.14?
>
> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> I'm testing writing to a CSV using Flink 1.13 and I get the following
>> error:
>>
>> The matching candidates:
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> Unsupported property keys:
>> format.quote-character
>>
>> I create the table env using this:
>>
>> final EnvironmentSettings envSettings =
>> EnvironmentSettings.newInstance()//
>> .useBlinkPlanner()//
>> // .inBatchMode()//
>> .inStreamingMode()//
>> .build();
>> final TableEnvironment tableEnv =
>> TableEnvironment.create(envSettings);
>>
>> The error is the same both with inBatchMode and inStreamingMode.
>> Is this really not supported or am I using the wrong API?
>>
>> Best,
>> Flavio
>>
>


Re: flink cdc postgres connector 1.1 - heartbeat.interval.ms - WAL consumption

2021-04-08 Thread bat man
Thanks Till.

Hi Jark,

Any inputs, going through the code of 1.1 and 1.3 in the meantime.

Thanks,
Hemant

On Thu, Apr 8, 2021 at 3:52 PM Till Rohrmann  wrote:

> Hi Hemant,
>
> I am pulling in Jark who is most familiar with Flink's cdc connector. He
> might also be able to tell whether the fix can be backported.
>
> Cheers,
> Till
>
> On Thu, Apr 8, 2021 at 10:42 AM bat man  wrote:
>
>> Anyone who has faced similar issues with cdc with Postgres.
>>
>> I see the restart_lsn and confirmed_flush_lsn constant since the snapshot
>> replication records were streamed even though I have tried inserting
>> a record in the whitelisted table.
>>
>> select * from pg_replication_slots;
>>   slot_name  |  plugin  | slot_type | datoid | database | temporary |
>> active | active_pid | xmin | catalog_xmin | restart_lsn |
>> confirmed_flush_lsn
>>
>> -+--+---++--+---+++--+--+-+-
>>  stream_cdc3 | pgoutput | logical   |  16411 | test_cdc | f | t
>>|   1146 |  | 6872 | 62/34000828 | 62/34000860
>>
>> I have passed the  heartbeat.interval.ms = 1000 and could see
>> the heartbeat events streamed to flink however the transaction log disk
>> usage and oldest replication slot lag consistently increasing. From [1] I
>> have also tried this -
>>
>> For other decoder plug-ins, it is recommended to create a supplementary
>> table that is not monitored by Debezium.
>>
>> A separate process would then periodically update the table (either
>> inserting a new event or updating the same row all over). PostgreSQL then
>> will invoke Debezium which will confirm the latest LSN and allow the
>> database to reclaim the WAL space.
>>
>> [image: Screenshot 2021-04-08 at 2.07.18 PM.png]
>>
>> [image: Screenshot 2021-04-08 at 2.07.52 PM.png]
>>
>> [1] -
>> https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space
>>
>> Thanks.
>>
>> On Wed, Apr 7, 2021 at 12:51 PM bat man  wrote:
>>
>>> Hi there,
>>>
>>> I am using flink 1.11 and cdc connector 1.1 to stream changes from a
>>> postgres table. I see the WAL consumption is increasing gradually even
>>> though the writes to tables are very less.
>>>
>>> I am using AWS RDS, from [1] I understand that setting the parameter
>>> heartbeat.interval.ms solves this WAL consumption issue. However, I
>>> tried setting this with no success.
>>>
>>> I found a bug [2] which seems to be taking care of committing the lsn
>>> for the db to release the wal. however this seems to be only fixed in 1.3
>>> which is compatible with flink 1.12.1. Is there any way this can be fixed
>>> in 1.11.1. Since I am using EMR and the latest flink version available is
>>> 1.11.
>>>
>>>
>>> [1] -
>>> https://debezium.io/documentation/reference/connectors/postgresql.html
>>> [2] - https://github.com/ververica/flink-cdc-connectors/issues/97
>>>
>>> Thanks.
>>> Hemant
>>>
>>


退订

2021-04-08 Thread 程鑫
退订

Re: UniqueKey constraint is lost with multiple sources join in SQL

2021-04-08 Thread Kai Fu
As identified with the community, it's bug and more information in issue
https://issues.apache.org/jira/browse/FLINK-22113

On Sat, Apr 3, 2021 at 8:43 PM Kai Fu  wrote:

> Hi team,
>
> We have a use case to join multiple data sources to generate a
> continuous updated view. We defined primary key constraint on all the input
> sources and all the keys are the subsets in the join condition. All joins
> are left join.
>
> In our case, the first two inputs can produce *JoinKeyContainsUniqueKey *input
> sepc, which is good and performant. While when it comes to the third input
> source, it's joined with the intermediate output table of the first two
> input tables, and the intermediate table does not carry key constraint
> information(although the thrid source input table does), so it results in a
> *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic
> performance implications per the Force Join Unique Key
> 
> email thread, we want to know if there is any mitigation plan for this.
>
> One solution I can come up with is to write the intermediate result into
> some place like Kafka with unique constraint and join with the
> third source, while it requires extra resources. Any other suggestion on
> this? Thanks.
>
> --
> *Best regards,*
> *- Kai*
>


-- 
*Best wishes,*
*- Kai*


Re: 求问Hive DDL TBLPROPERTIES不生效

2021-04-08 Thread Rui Li
你好,

sink.partition-commit.trigger设置成process-time的话是不按照分区值来提取timestamp的,设置成partition-time才可以,但请注意partition-time需要通过watermark来触发。更详细的信息可以参考文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#sink-partition-commit-trigger

另外如果怀疑是语法问题导致参数没有正确设置的话,也可以在hive里describe一下这张表验证一下。

On Thu, Apr 8, 2021 at 6:07 PM eriendeng  wrote:

> SET table.sql-dialect=hive;
> CREATE TABLE hive_catalog.test_db.test_insert_tb (
> id BIGINT,
> type INT,
> user_id BIGINT,
> title STRING,
> ts TIMESTAMP
> ) PARTITIONED BY (add_date STRING, add_hour STRING) STORED AS orc
> TBLPROPERTIES (
> 'sink.partition-commit.trigger' = 'process-time',
> 'sink.partition-commit.delay' = '0s',
> 'sink.partition-commit.policy.kind' = 'metastore,success-file',
> 'partition.time-extractor.kind' = 'custom',
> 'partition.time-extractor.timestamp-pattern' = '$add_date $add_hour',
> 'partition.time-extractor.class' = '.YMDPartTimeExtractor'
> );
>
>
> 如上在hive-dialect里建了一张表,启动任务后从kafka读然后写入,貌似TBLPROPERTIES里的参数一个都没生效,数据能从hdfs上看到,但是hive没有partition,分区也没有按照我class写的进行(我的class需要把分区弄成
> MMdd, HH这种日期格式,同时修复+8时区),TM
> JM也没有任何exception,请问有见过这种情况的吗?是不是某个参数没搞对所以都没生效
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


GenericRowData与BinaryRowData的转换

2021-04-08 Thread Luna Wong
我看Kafka Connector源码生成的是GenericRowData,到Jdbc
Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么?


Re: Reducing Task Manager Count Greatly Increases Savepoint Restore

2021-04-08 Thread Till Rohrmann
Hi Kevin,

when decreasing the TaskManager count I assume that you also decrease the
parallelism of the Flink job. There are three aspects which can then cause
a slower recovery.

1) Each Task gets a larger key range assigned. Therefore, each TaskManager
has to download more data in order to restart the Task. Moreover, there are
fewer nodes downloading larger portions of the data (less parallelization).
2) If you rescaled the parallelism, then it can happen that a Task gets a
key range assigned which requires downloading of multiple key range parts
from the previous run/savepoint. The new key range might not need all the
data from the savepoint parts and hence you download some data which is not
really used in the end.
3) When rescaling the job, then Flink has to rebuild the RocksDB instance
which is an expensive and slow operation. What happens is that Flink
creates for every savepoint part which it needs for its key range a RocksDB
instance and then extracts the part which is only relevant for its key
range into a new RocksDB instance. This causes a lot of read and write
amplification.

Cheers,
Till

On Wed, Apr 7, 2021 at 4:07 PM Kevin Lam  wrote:

> Hi all,
>
> We are trying to benchmark savepoint size vs. restore time.
>
> One thing we've observed is that when we reduce the number of task
> managers, the time to restore from a savepoint increases drastically:
>
> 1/ Restoring from 9.7tb savepoint onto 156 task managers takes 28 minutes
> 2/ Restoring from the save savepoint onto 30 task managers takes over 3
> hours
>
> *Is this expected? How does the restore process work? Is this just a
> matter of having lower restore parallelism for 30 task managers vs 156 task
> managers? *
>
> Some details
>
> - Running on kubernetes
> - Used Rocksdb with a local ssd for state backend
> - Savepoint is hosted on GCS
> - The smaller task manager case is important to us because we expect to
> deploy our application with a high number of task managers, and downscale
> once a backfill is completed
>
> Differences between 1/ and 2/:
>
> 2/ has decreased task manager count 156 -> 30
> 2/ has decreased operator parallelism by a factor of ~10
> 2/ uses a striped SSD (3 ssds mounted as a single logical volume) to hold
> rocksdb files
>
> Thanks in advance for your help!
>


Re: flink cdc postgres connector 1.1 - heartbeat.interval.ms - WAL consumption

2021-04-08 Thread Till Rohrmann
Hi Hemant,

I am pulling in Jark who is most familiar with Flink's cdc connector. He
might also be able to tell whether the fix can be backported.

Cheers,
Till

On Thu, Apr 8, 2021 at 10:42 AM bat man  wrote:

> Anyone who has faced similar issues with cdc with Postgres.
>
> I see the restart_lsn and confirmed_flush_lsn constant since the snapshot
> replication records were streamed even though I have tried inserting
> a record in the whitelisted table.
>
> select * from pg_replication_slots;
>   slot_name  |  plugin  | slot_type | datoid | database | temporary |
> active | active_pid | xmin | catalog_xmin | restart_lsn |
> confirmed_flush_lsn
>
> -+--+---++--+---+++--+--+-+-
>  stream_cdc3 | pgoutput | logical   |  16411 | test_cdc | f | t
>|   1146 |  | 6872 | 62/34000828 | 62/34000860
>
> I have passed the  heartbeat.interval.ms = 1000 and could see
> the heartbeat events streamed to flink however the transaction log disk
> usage and oldest replication slot lag consistently increasing. From [1] I
> have also tried this -
>
> For other decoder plug-ins, it is recommended to create a supplementary
> table that is not monitored by Debezium.
>
> A separate process would then periodically update the table (either
> inserting a new event or updating the same row all over). PostgreSQL then
> will invoke Debezium which will confirm the latest LSN and allow the
> database to reclaim the WAL space.
>
> [image: Screenshot 2021-04-08 at 2.07.18 PM.png]
>
> [image: Screenshot 2021-04-08 at 2.07.52 PM.png]
>
> [1] -
> https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space
>
> Thanks.
>
> On Wed, Apr 7, 2021 at 12:51 PM bat man  wrote:
>
>> Hi there,
>>
>> I am using flink 1.11 and cdc connector 1.1 to stream changes from a
>> postgres table. I see the WAL consumption is increasing gradually even
>> though the writes to tables are very less.
>>
>> I am using AWS RDS, from [1] I understand that setting the parameter
>> heartbeat.interval.ms solves this WAL consumption issue. However, I
>> tried setting this with no success.
>>
>> I found a bug [2] which seems to be taking care of committing the lsn for
>> the db to release the wal. however this seems to be only fixed in 1.3 which
>> is compatible with flink 1.12.1. Is there any way this can be fixed in
>> 1.11.1. Since I am using EMR and the latest flink version available is 1.11.
>>
>>
>> [1] -
>> https://debezium.io/documentation/reference/connectors/postgresql.html
>> [2] - https://github.com/ververica/flink-cdc-connectors/issues/97
>>
>> Thanks.
>> Hemant
>>
>


求问Hive DDL TBLPROPERTIES不生效

2021-04-08 Thread eriendeng
SET table.sql-dialect=hive;
CREATE TABLE hive_catalog.test_db.test_insert_tb (
id BIGINT,
type INT,
user_id BIGINT,
title STRING,
ts TIMESTAMP
) PARTITIONED BY (add_date STRING, add_hour STRING) STORED AS orc
TBLPROPERTIES (
'sink.partition-commit.trigger' = 'process-time',
'sink.partition-commit.delay' = '0s',
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'partition.time-extractor.kind' = 'custom',
'partition.time-extractor.timestamp-pattern' = '$add_date $add_hour',
'partition.time-extractor.class' = '.YMDPartTimeExtractor'
);

如上在hive-dialect里建了一张表,启动任务后从kafka读然后写入,貌似TBLPROPERTIES里的参数一个都没生效,数据能从hdfs上看到,但是hive没有partition,分区也没有按照我class写的进行(我的class需要把分区弄成
MMdd, HH这种日期格式,同时修复+8时区),TM
JM也没有任何exception,请问有见过这种情况的吗?是不是某个参数没搞对所以都没生效



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread Yangze Guo
IIUC, your program will finally generate 100 ChildFirstClassLoader in
a TM. But it should always be GC when job finished. So, as Arvid said,
you'd better check who is referencing those ChildFirstClassLoader.


Best,
Yangze Guo

On Thu, Apr 8, 2021 at 5:43 PM 太平洋 <495635...@qq.com> wrote:
>
> My application program looks like this. Does this structure has some problem?
>
> public class StreamingJob {
> public static void main(String[] args) throws Exception {
> int i = 0;
> while (i < 100) {
> try {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.setParallelism(Parallelism);
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner()
> .inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
> bsSettings);
>
> bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
> Table t = bsTableEnv.sqlQuery(query);
>
> DataStream points = bsTableEnv.toAppendStream(t, DataPoint.class);
>
> DataStream weightPoints = points.map();
>
> DataStream predictPoints = weightPoints.keyBy()
> .reduce().map();
>
> // side output
> final OutputTag outPutPredict = new 
> OutputTag("predict") {
> };
>
> SingleOutputStreamOperator mainDataStream = predictPoints
> .process();
>
> DataStream exStream = 
> mainDataStream.getSideOutput(outPutPredict);
>
> //write data to clickhouse
> String insertIntoCKSql = "xxx";
> mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
> new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
> new 
> JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
> .withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));
>
> // write data to kafka
> FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
> exStream.map().addSink(producer);
>
> env.execute("Prediction Program");
> } catch (Exception e) {
> e.printStackTrace();
> }
> i++;
> Thread.sleep(window * 1000);
> }
> }
> }
>
>
>
> -- 原始邮件 --
> 发件人: "Arvid Heise" ;
> 发送时间: 2021年4月8日(星期四) 下午2:33
> 收件人: "Yangze Guo";
> 抄送: 
> "太平洋"<495635...@qq.com>;"user";"guowei.mgw";"renqschn";
> 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> Hi,
>
> ChildFirstClassLoader are created (more or less) by application jar and 
> seeing so many looks like a classloader leak to me. I'd expect you to see a 
> new ChildFirstClassLoader popping up with each new job submission.
>
> Can you check who is referencing the ChildFirstClassLoader transitively? 
> Usually, it's some thread that is lingering around because some third party 
> library is leaking threads etc.
>
> OneInputStreamTask is legit and just indicates that you have a job running 
> with 4 slots on that TM. It should not hold any dedicated metaspace memory.
>
> On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo  wrote:
>>
>> I went through the JM & TM logs but could not find any valuable clue.
>> The exception is actually thrown by kafka-producer-network-thread.
>> Maybe @Qingsheng could also take a look?
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Apr 8, 2021 at 10:39 AM 太平洋 <495635...@qq.com> wrote:
>> >
>> > I have configured to 512M, but problem still exist. Now the memory size is 
>> > still 256M.
>> > Attachments are TM and JM logs.
>> >
>> > Look forward to your reply.
>> >
>> > -- 原始邮件 --
>> > 发件人: "Yangze Guo" ;
>> > 发送时间: 2021年4月6日(星期二) 晚上6:35
>> > 收件人: "太平洋"<495635...@qq.com>;
>> > 抄送: "user";"guowei.mgw";
>> > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>> >
>> > > I have tried this method, but the problem still exist.
>> > How much memory do you configure for it?
>> >
>> > > is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
>> > Not quite sure about it. AFAIK, each job will have a classloader.
>> > Multiple tasks of the same job in the same TM will share the same
>> > classloader. The classloader will be removed if there is no more task
>> > running on the TM. Classloader without reference will be finally
>> > cleanup by GC. Could you share JM and TM logs for further analysis?
>> > I'll also involve @Guowei Ma in this thread.
>> >
>> >
>> > Best,
>> > Yangze Guo
>> >
>> > On Tue, Apr 6, 2021 at 6:05 PM 太平洋 <495635...@qq.com> wrote:
>> > >
>> > > I have tried this method, but the problem still exist.
>> > > by heap dump analysis, is 21 instances of 
>> > > "org.apache.flink.util.ChildFirstClassLoader" normal?
>> > >
>> > >
>> > > -- 原始邮件 --
>> > > 发件人: "Yangze Guo" ;
>> > > 发送时间: 2021年4月6日(星期二) 下午4:32
>> > > 收件人: "太平洋"<495635...@qq.com>;
>> > > 抄送: "user";
>> > > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>> > >
>> > > I think you can try to increase the JVM metaspace option for
>> > > TaskManagers through 

?????? period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread ??????
My application program looks like this. Does this structure has some problem?


public class StreamingJob {
public static void main(String[] args) throws Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);


EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner()

.inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(env, bsSettings);


bsTableEnv.executeSql("CREATE TEMPORARY 
TABLE ");
Table t = bsTableEnv.sqlQuery(query);


DataStreamhttps://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
  
   Best,
   Yangze Guo
  
   Best,
   Yangze Guo
  
  
   On Tue, Apr 6, 2021 at 4:22 PM ?? <495635...@qq.com wrote:
   
batch job??
read data from s3 by sql??then by some operators and write data 
to clickhouse and kafka.
after some times, task-manager quit with OutOfMemoryError: 
Metaspace.
   
env??
flink version??1.12.2
task-manager slot count: 5
deployment?? standalone kubernetes session 
dependencies??
   
 

SinkFunction与OutputFormat选择哪个?

2021-04-08 Thread Luna Wong
自定义Connector时,RichSinkFunction和RichOutputFormat可以选择一个进行实现,那么作为开发者应该如何选择呢?


Why does flink-quickstart-scala suggests adding connector dependencies in the default scope, while Flink Hive integration docs suggest the opposite

2021-04-08 Thread Yik San Chan
The question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/67001326/why-does-flink-quickstart-scala-suggests-adding-connector-dependencies-in-the-de
.

## Connector dependencies should be in default scope

This is what [flink-quickstart-scala](
https://github.com/apache/flink/blob/d12eeedfac6541c3a0711d1580ce3bd68120ca90/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml#L84)
suggests:

```



```

It also aligns with [Flink project configuration](
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies
):

> We recommend packaging the application code and all its required
dependencies into one jar-with-dependencies which we refer to as the
application jar. The application jar can be submitted to an already running
Flink cluster, or added to a Flink application container image.
>
> Important: For Maven (and other build tools) to correctly package the
dependencies into the application jar, these application dependencies must
be specified in scope compile (unlike the core dependencies, which must be
specified in scope provided).

## Hive connector dependencies should be in provided scope

However, [Flink Hive Integration docs](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven)
suggests the opposite:

> If you are building your own program, you need the following dependencies
in your mvn file. It’s recommended not to include these dependencies in the
resulting jar file. You’re supposed to add dependencies as stated above at
runtime.

## Why?

Thanks!

Best,
Yik San


flink cdc读取Maxwell格式的binlog,如何获取表名等元信息

2021-04-08 Thread chen310
请教下,看了这篇文章https://developer.aliyun.com/article/771438,flink-cdc 读取mysql
Maxwell 格式binlog,怎么在flink 源表上获取mysql表名,通过这样的方式并没有生效  `origin_table` STRING
METADATA FROM 'value.source.table' VIRTUAL,

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html


 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Flavio Pompermaier
Any help here? Moreover if I use the DataStream APIs there's no left/right
outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier 
wrote:

> Hi to all,
> I'm testing writing to a CSV using Flink 1.13 and I get the following
> error:
>
> The matching candidates:
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> Unsupported property keys:
> format.quote-character
>
> I create the table env using this:
>
> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
> .useBlinkPlanner()//
> // .inBatchMode()//
> .inStreamingMode()//
> .build();
> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>
> The error is the same both with inBatchMode and inStreamingMode.
> Is this really not supported or am I using the wrong API?
>
> Best,
> Flavio
>


Re: flink cdc postgres connector 1.1 - heartbeat.interval.ms - WAL consumption

2021-04-08 Thread bat man
Anyone who has faced similar issues with cdc with Postgres.

I see the restart_lsn and confirmed_flush_lsn constant since the snapshot
replication records were streamed even though I have tried inserting
a record in the whitelisted table.

select * from pg_replication_slots;
  slot_name  |  plugin  | slot_type | datoid | database | temporary |
active | active_pid | xmin | catalog_xmin | restart_lsn |
confirmed_flush_lsn
-+--+---++--+---+++--+--+-+-
 stream_cdc3 | pgoutput | logical   |  16411 | test_cdc | f | t
 |   1146 |  | 6872 | 62/34000828 | 62/34000860

I have passed the  heartbeat.interval.ms = 1000 and could see the heartbeat
events streamed to flink however the transaction log disk usage and oldest
replication slot lag consistently increasing. From [1] I have also tried
this -

For other decoder plug-ins, it is recommended to create a supplementary
table that is not monitored by Debezium.

A separate process would then periodically update the table (either
inserting a new event or updating the same row all over). PostgreSQL then
will invoke Debezium which will confirm the latest LSN and allow the
database to reclaim the WAL space.

[image: Screenshot 2021-04-08 at 2.07.18 PM.png]

[image: Screenshot 2021-04-08 at 2.07.52 PM.png]

[1] -
https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space

Thanks.

On Wed, Apr 7, 2021 at 12:51 PM bat man  wrote:

> Hi there,
>
> I am using flink 1.11 and cdc connector 1.1 to stream changes from a
> postgres table. I see the WAL consumption is increasing gradually even
> though the writes to tables are very less.
>
> I am using AWS RDS, from [1] I understand that setting the parameter
> heartbeat.interval.ms solves this WAL consumption issue. However, I tried
> setting this with no success.
>
> I found a bug [2] which seems to be taking care of committing the lsn for
> the db to release the wal. however this seems to be only fixed in 1.3 which
> is compatible with flink 1.12.1. Is there any way this can be fixed in
> 1.11.1. Since I am using EMR and the latest flink version available is 1.11.
>
>
> [1] -
> https://debezium.io/documentation/reference/connectors/postgresql.html
> [2] - https://github.com/ververica/flink-cdc-connectors/issues/97
>
> Thanks.
> Hemant
>


Re: flink 1.12.0 FileSystem 读取 hdfs 文件

2021-04-08 Thread Rui Li
你好,

tableEnv.executeSql对DQL和DML是异步执行的,要等作业结束可以用tableEnv.executeSql.await

On Wed, Apr 7, 2021 at 3:55 PM Ink足迹  wrote:

> Flink 1.12.0中 Filesystem 读取 hdfs
> orc文件,但是数据没有打印出来,各位大佬这是什么原因导致的?EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
> String createSql = "CREATE TABLE test_jds_table (" +
> "  dc__key Int," +
> "  name String," +
> "  c4 String," +
> "  dc__time String" +
> ") WITH (" +
> " 'connector' = 'filesystem'," +
> " 'path' =
> 'hdfs://HDFS41645/usr/hive/warehouse/tmp.db/test_jds_table_d839cd09f5cc44dba8f45eee0d282ee3',"
> +
> " 'format' = 'orc'" +
> ")";
> String createPrintTableSql = "CREATE TABLE print_table WITH ('connector' =
> 'print') LIKE test_jds_table (EXCLUDING ALL)";
> String insertSql = "insert into print_table select * from
> `test_jds_table`";
> tableEnv.executeSql(createSql);
> tableEnv.executeSql(createPrintTableSql);
> tableEnv.executeSql(insertSql);



-- 
Best regards!
Rui Li


Re: kafka数据源jar包使用异常

2021-04-08 Thread Qingsheng Ren
Hi,

从错误来看是在作业 JAR 里面缺少了 Flink Kafka connector 相关的类。可以确认一下 JAR 包里面是否把 Flink Kafka 
connector 相关的类打进去了,在 Maven POM 依赖中引用了 Kafka connector 并不意味着一定会被打进作业 JAR 中。

--
Best Regards,

Qingsheng Ren
Real-time Computing Department, Alibaba Cloud
Alibaba Group
Email: renqs...@gmail.com


在 2021年4月7日 +0800 PM3:27,小猫爱吃鱼 <1844061...@qq.com>,写道:
> Hi,
>   我在使用flink-1.13的过程中,尝试使用kafka数据源。
>   
> 我把flink-example中的stream-WordCount进行了修改,使其从本地kafka读取数据,直接砸idea运行也结果良好,可以正常运行。但是使用mvn打包后的jar直接提交给本地编译的flink
>  binary(本地启动的standlone flink),会报以下异常。
>
>
> java.lang.RuntimeException: Could not look up the main(String[]) method from 
> the class org.apache.flink.streaming.examples.wordcount.WordCount2: 
> org/apache/flink/stream
> ing/connectors/kafka/KafkaDeserializationSchema
> at 
> org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)
> at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:161)
> at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:65)
> at 
> org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
> at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:851)
> at 
> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:271)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
> at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema
> at java.lang.Class.getDeclaredMethods0(Native Method)
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
> at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
> at java.lang.Class.getMethod0(Class.java:3018)
> at java.lang.Class.getMethod(Class.java:1784)
> at 
> org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:307)
> ... 10 more
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ... 16 more
>
>
> 我查询的解决方式是在pom中改变对应的依赖,但是我不理解该如何处理,我在flink-example-stream的pom文件中找到了对于kafka-connector的依赖,上层的pom文件没有相关的依赖,请问我该如何处理这一问题?
> 我可以保证的是pim文件只有对于新增示例项目的修改,没有修改其他的依赖关系
>
>
> 非常感谢!


Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Arvid Heise
Hi Vijay,

edit: After re-reading your message: are you sure that you restart from a
checkpoint/savepoint? If you just start the application anew and use LATEST
initial position, this is the expected bahvior.

--- original intended answer if you restart from checkpoint

this is definitively not the expected behavior.

To exclude certain error sources:
- Could you double-check if this is also happening if you don't use
unaligned checkpoints? (I don't really think this is because of unaligned
checkpoint, but it's better to be sure and we want to reduce the possible
error sources)
- Can you see the missing messages still in Kinesis?
- Could you extract all log INFO statements from
org.apache.flink.streaming.connectors.kinesis and attach them here?
- How long did you wait with recovery?



On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
wrote:

> Hi Team,
>
> We are trying to make sure we are not losing data when KINESIS Consumer is
> down.
>
> Kinesis streaming Job which has following checkpointing properties:
>
>
> *// checkpoint every X msecs
> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>
>
>
>
>
>
>
>
> *// enable externalized checkpoints which are retained after job
> cancellation
> env.getCheckpointConfig().enableExternalizedCheckpoints(
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>   );// allow job recovery fallback to checkpoint when there is a more
> recent savepoint
> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>  //
> enables the experimental unaligned checkpoints
> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>
> *//checkpointpath*
> *env.setStateBackend(new
> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>
> 1) We killed the Kinesis Job
> 2) Sent messages to KDS while Consumer was down.
> 3) Restarted Flink Consumer, *messages which were sent during the
> Consumer down period, never ingested (data loss).*
> 4) Re-sent messages to KDS while the consumer was still up. Messages did
> ingest fine.
>
> *How can I avoid data loss for #3 ??*
>
> From Logs:
>
>
> *2021-04-07 12:15:49,161 INFO
>  org.apache.flink.runtime.jobmaster.JobMaster  - Using
> application-defined state backend: File State Backend (checkpoints:
> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
> TRUE, fileStateThreshold: -1)*
>
>
>
> *2021-04-07 12:16:02,343 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
> ms).2021-04-07 12:16:11,951 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
> ms).*
>
> Thanks,
> Vijay
>


Re: SingleValueAggFunction received more than one element error with LISTAGG

2021-04-08 Thread Timo Walther

Hi,

which Flink version are you using?

Could you also share the resulting plan with us using 
`TableEnvironment.explainSql()`?


Thanks,
Timo


On 07.04.21 17:29, soumoks123 wrote:

I receive the following error when trying to use the LISTAGG function in
Table API.


java.lang.RuntimeException: SingleValueAggFunction received more than one
element.
 at GroupAggsHandler$1460.accumulate(Unknown Source)
 at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:158)
 at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
 at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)


This is my query,

SELECT node.nid,
(SELECT LISTAGG(DISTINCT(TDC.name)) FROM M_TermNode TNC JOIN M_TermData TDC
ON TNC.tid = TDC.tid WHERE node.nid = TNC.nid AND TDC.vid = (SELECT
Vocab.vid from M_Vocabulary Vocab where Vocab.`module` = 'extra_fields')) AS
characteristics
FROM node


The above query looks slightly complex but essentially boils down to a group
by on TDC.vid. This may return more than one value for TDC.name which needs
to be concatenated into the same string.


I have tried removing the DISTINCT clause inside LISTAGG but to no avail.

The same query works on a MySQL DB with GROUP_CONCAT instead of LISTAGG.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Compression with rocksdb backed state

2021-04-08 Thread Timo Walther

Hi Deepthi,

1. Correct
2. Correct
3. Incremental snapshots simply manage references to RocksDB's sstables. 
You can find a full explanation here [1]. Thus, the payload is a 
blackbox for Flink and Flink's compression flag has no impact. So we 
fully rely what RocksDB offers.

4. Correct

I hope this helps.

Regards,
Timo


[1] 
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html



On 07.04.21 22:04, deepthi Sridharan wrote:
I am trying to understand this section on compression of checkpoints 
which has me a bit confused 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression 



Could you please confirm if my understanding is correct:
1. Compression is disabled by default for full snapshots and needs to be 
turned on if required.
2. Current versions do not support changing the compression type (snappy 
by default) when enabled. Future versions will support configuring the 
compression type.

3. Compression is enabled by default for incremental snapshot.
4. The compression type cannot be configured for incremental savepoints.

--
Regards,
Deepthi




Re: Flink 1.12.2 sql api use parquet format error

2021-04-08 Thread Timo Walther

Hi,

can you check the content of the JAR file that you are submitting? There 
should be a `META-INF/services` directory with a 
`org.apache.flink.table.factories.Factory` file that should list the 
Parque format.


See also here:

https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/overview/#transform-table-connectorformat-resources

Regards,
Timo


On 06.04.21 10:25, ?? wrote:

ref:?0?2https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html

env and error:

Flink version?? 1.12.2
deployment?? standalone kubernetes session
dependency:
 ?0?2 ?0?2 ?0?2 ?0?2 
 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 org.apache.flink
flink-parquet_2.11
1.12.2
 ?0?2 ?0?2 ?0?2 ?0?2 

??
Caused by: org.apache.flink.table.api.ValidationException: Could not 
find any format factory for identifier 'parquet' in the classpath. at 
org.apache.flink.table.filesystem.FileSystemTableSink.(FileSystemTableSink.java:124) 
at 
org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:85)






Re: Zigzag shape in TM JVM used memory

2021-04-08 Thread Piotr Nowojski
Hi,

I don't think there is a Flink specific answer to this question. Just do
what you would normally do with a normal Java application running inside a
JVM. If there is an OOM on heap space, you can either try to bump the heap
space, or reduce usage of it. The only Flink specific part is probably that
you need to leave enough memory for the framework itself, and that there
are a couple of different memory pools. You can read about those things in
the docs:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_tuning.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_trouble.html

Piotrek



czw., 8 kwi 2021 o 02:19 Lu Niu  napisał(a):

> Hi, Piotr
>
> Thanks for replying. I asked this because such a pattern might imply memory
> oversubscription. For example, I tuned down the memory of one app from heap
> 2.63GB to 367MB and the job still runs fine:
> before:
>
> https://drive.google.com/file/d/1o8k9Vv3yb5gXITi4GvmlXMteQcRfmOhr/view?usp=sharing
>
> after:
>
> https://drive.google.com/file/d/1wNTHBT8aSJaAmL1rVY8jUkdp-G5znnMN/view?usp=sharing
>
>
> What's the best practice for tuning Flink job memory?
>
> 1. What’s a good start point users should try first?
> 2. How to make progress? e.g. flink application Foo currently encountered
> error OOM: java heap space. Where to move next? simply bump up
> taskmananger.memory? or just increase heap?
> 3. What’s the final state? Job running fine and ensuring XYZ headroom in
> each memory component?
>
> Best
> Lu
>
> On Tue, Apr 6, 2021 at 12:26 AM Piotr Nowojski 
> wrote:
>
> > Hi,
> >
> > this should be posted on the user mailing list not the dev.
> >
> > Apart from that, this looks like normal/standard behaviour of JVM, and
> has
> > very little to do with Flink. Garbage Collector (GC) is kicking in when
> > memory usage is approaching some threshold:
> > https://www.google.com/search?q=jvm+heap+memory+usage=isch
> >
> > Piotrek
> >
> >
> > pon., 5 kwi 2021 o 22:54 Lu Niu  napisał(a):
> >
> > > Hi,
> > >
> > > we need to update our email system then :) . Here are the links:
> > >
> > >
> > >
> >
> https://drive.google.com/file/d/1lZ5_P8_NqsN1JeLzutGj4DxkyWJN75mR/view?usp=sharing
> > >
> > >
> > >
> >
> https://drive.google.com/file/d/1J6c6rQJwtDp1moAGlvQyLQXTqcuG4HjL/view?usp=sharing
> > >
> > >
> > >
> >
> https://drive.google.com/file/d/1-R2KzsABC471AEjkF5qTm5O3V47cpbBV/view?usp=sharing
> > >
> > > All are DataStream job.
> > >
> > > Best
> > > Lu
> > >
> > > On Sun, Apr 4, 2021 at 9:17 PM Yun Gao  wrote:
> > >
> > > >
> > > > Hi Lu,
> > > >
> > > > The image seems not be able to shown due to the mail server
> limitation,
> > > > could you upload it somewhere and paste the link here ?
> > > >
> > > > Logically, I think zigzag usually due to there are some small object
> > get
> > > > created and eliminated soon in the heap. Are you running a SQL job
> or a
> > > > DataStream job ?
> > > >
> > > > Best,
> > > > Yun
> > > >
> > > > --
> > > > Sender:Lu Niu
> > > > Date:2021/04/05 12:06:24
> > > > Recipient:d...@flink.apache.org
> > > > Theme:Zigzag shape in TM JVM used memory
> > > >
> > > > Hi, Flink dev
> > > >
> > > > We observed that the TM JVM used memory metric shows zigzag shape
> among
> > > > lots of our applications, although these applications are quite
> > different
> > > > in business logic. The upper bound is close to the max heap size. Is
> > this
> > > > expected in flink application? Or does flink internally
> > > > aggressively pre-allocate memory?
> > > >
> > > > app1
> > > > [image: Screen Shot 2021-04-04 at 8.46.45 PM.png]
> > > > app2
> > > > [image: Screen Shot 2021-04-04 at 8.45.35 PM.png]
> > > > app3
> > > > [image: Screen Shot 2021-04-04 at 8.43.53 PM.png]
> > > >
> > > > Best
> > > > Lu
> > > >
> > > >
> > >
> >
>


Re:flink connector用户密码

2021-04-08 Thread Michael Ran
好像并不能- -,想支持的话,你在密码那里加个参数,支持一个加密解密的参数就行。重新打包
在 2021-04-02 16:58:30,"guoyb" <861277...@qq.com> 写道:
>比如jdbc connector MySQL
>
>
>create table xxx
>()
>with(
>" user name"=" root"
>," password"="123456"
>)
>;
>用户密码可以怎么配置,可以不用明文这种方式。


Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread Arvid Heise
Hi,

ChildFirstClassLoader are created (more or less) by application jar and
seeing so many looks like a classloader leak to me. I'd expect you to see a
new ChildFirstClassLoader popping up with each new job submission.

Can you check who is referencing the ChildFirstClassLoader transitively?
Usually, it's some thread that is lingering around because some third party
library is leaking threads etc.

OneInputStreamTask is legit and just indicates that you have a job running
with 4 slots on that TM. It should not hold any dedicated metaspace memory.

On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo  wrote:

> I went through the JM & TM logs but could not find any valuable clue.
> The exception is actually thrown by kafka-producer-network-thread.
> Maybe @Qingsheng could also take a look?
>
>
> Best,
> Yangze Guo
>
> On Thu, Apr 8, 2021 at 10:39 AM 太平洋 <495635...@qq.com> wrote:
> >
> > I have configured to 512M, but problem still exist. Now the memory size
> is still 256M.
> > Attachments are TM and JM logs.
> >
> > Look forward to your reply.
> >
> > -- 原始邮件 --
> > 发件人: "Yangze Guo" ;
> > 发送时间: 2021年4月6日(星期二) 晚上6:35
> > 收件人: "太平洋"<495635...@qq.com>;
> > 抄送: "user";"guowei.mgw";
> > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
> >
> > > I have tried this method, but the problem still exist.
> > How much memory do you configure for it?
> >
> > > is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
> > Not quite sure about it. AFAIK, each job will have a classloader.
> > Multiple tasks of the same job in the same TM will share the same
> > classloader. The classloader will be removed if there is no more task
> > running on the TM. Classloader without reference will be finally
> > cleanup by GC. Could you share JM and TM logs for further analysis?
> > I'll also involve @Guowei Ma in this thread.
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Apr 6, 2021 at 6:05 PM 太平洋 <495635...@qq.com> wrote:
> > >
> > > I have tried this method, but the problem still exist.
> > > by heap dump analysis, is 21 instances of
> "org.apache.flink.util.ChildFirstClassLoader" normal?
> > >
> > >
> > > -- 原始邮件 --
> > > 发件人: "Yangze Guo" ;
> > > 发送时间: 2021年4月6日(星期二) 下午4:32
> > > 收件人: "太平洋"<495635...@qq.com>;
> > > 抄送: "user";
> > > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
> > >
> > > I think you can try to increase the JVM metaspace option for
> > > TaskManagers through taskmanager.memory.jvm-metaspace.size. [1]
> > >
> > > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > > On Tue, Apr 6, 2021 at 4:22 PM 太平洋 <495635...@qq.com> wrote:
> > > >
> > > > batch job:
> > > > read data from s3 by sql,then by some operators and write data to
> clickhouse and kafka.
> > > > after some times, task-manager quit with OutOfMemoryError: Metaspace.
> > > >
> > > > env:
> > > > flink version:1.12.2
> > > > task-manager slot count: 5
> > > > deployment: standalone kubernetes session 模式
> > > > dependencies:
> > > >
> > > > 
> > > >
> > > >   org.apache.flink
> > > >
> > > >   flink-connector-kafka_2.11
> > > >
> > > >   ${flink.version}
> > > >
> > > > 
> > > >
> > > > 
> > > >
> > > >   com.google.code.gson
> > > >
> > > >   gson
> > > >
> > > >   2.8.5
> > > >
> > > > 
> > > >
> > > > 
> > > >
> > > >   org.apache.flink
> > > >
> > > >   flink-connector-jdbc_2.11
> > > >
> > > >   ${flink.version}
> > > >
> > > > 
> > > >
> > > > 
> > > >
> > > >   ru.yandex.clickhouse
> > > >
> > > >   clickhouse-jdbc
> > > >
> > > >   0.3.0
> > > >
> > > > 
> > > >
> > > > 
> > > >
> > > >   org.apache.flink
> > > >
> > > > flink-parquet_2.11
> > > >
> > > > ${flink.version}
> > > >
> > > > 
> > > >
> > > > 
> > > >
> > > >  org.apache.flink
> > > >
> > > >  flink-json
> > > >
> > > >  ${flink.version}
> > > >
> > > > 
> > > >
> > > >
> > > > heap dump1:
> > > >
> > > > Leak Suspects
> > > >
> > > > System Overview
> > > >
> > > >  Leaks
> > > >
> > > >  Overview
> > > >
> > > >
> > > >   Problem Suspect 1
> > > >
> > > > 21 instances of "org.apache.flink.util.ChildFirstClassLoader",
> loaded by "sun.misc.Launcher$AppClassLoader @ 0x73b2d42e0" occupy
> 29,656,880 (41.16%) bytes.
> > > >
> > > > Biggest instances:
> > > >
> > > > org.apache.flink.util.ChildFirstClassLoader @ 0x73ca2a1e8 -
> 1,474,760 (2.05%) bytes.
> > > > org.apache.flink.util.ChildFirstClassLoader @ 0x73d2af820 -
> 1,474,168 (2.05%) bytes.
> > > > org.apache.flink.util.ChildFirstClassLoader @ 0x73cdcaa10 -
> 1,474,160 (2.05%) bytes.
> > > > org.apache.flink.util.ChildFirstClassLoader @ 0x73cf6aab0 -
> 1,474,160 (2.05%) bytes.
> > > > 

Re: Async + Broadcast?

2021-04-08 Thread Arvid Heise
Hi Alex,

your approach is completely valid. What you want to achieve is that you
have a chain between your state managing operator and the consuming async
operations. In that way, you have no serialization overhead.

To achieve that you want to
- use Flink 1.11+ [1]
- make sure that if you have a legacy source, you disableChaining before
your state managing operator as asyncIO cannot be (transitively) chained to
legacy sources. So it should be source -> ... -> (forward channel) ->
(state managing operator -> async1 -> async2 -> ... ) ... -> sink
- enableObjectReuse [2] to avoid copying of objects

[1] https://issues.apache.org/jira/browse/FLINK-16219
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html

On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise  wrote:

> Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)
>
> -0xe1a
>
> On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Alex,
>>
>> I'm not sure if there is a best practice here, but what I can tell you is
>> that I worked on a job that did exactly what you're suggesting with a
>> non-async operator to create a [record, config] tuple, which was then
>> passed to the async stage. Our config objects were also not tiny (~500kb)
>> and our pipeline not huge (~1M records/day and 1GB data/ day), but this
>> setup worked quite well. I'd say if latency isn't your most important
>> metric, or if your pipeline is a similar size, the ease of async IO is
>> worth it.
>>
>> One thing you'll have to look out for (if you haven't already) is
>> bootstrapping the config objects when the job starts, since the broadcast
>> from the polling source can happen later than recieving the first record –
>> we solved this by calling the polling source's service in the `open()`
>> method of the non-async operator and storing the initial configs in memory.
>>
>> Hope that helps a bit,
>> Austin
>>
>> On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise  wrote:
>>
>>> Hi folks,
>>>
>>> I have a somewhat complex Flink job that has a few async stages, and a
>>> few stateful stages. It currently loads its configuration on startup, and
>>> doesn't attempt to refresh it.
>>>
>>> Now I'm working on dynamic reconfiguration. I've written a polling
>>> source which sends a configuration snapshot whenever anything has changed,
>>> I've set up a broadcast of that source, and I'm updating the operators in
>>> the data (i.e. not config) stream to be BroadcastProcessFunctions. But now
>>> I've reached the first async operator, and I recall that async functions
>>> aren't allowed to be stateful.
>>>
>>> I've tried to find a best practice for this situation, without much
>>> luck. My best idea so far is to insert a new stage before the async one,
>>> which would tuple up each record with its corresponding config snapshot
>>> from the most recent broadcast state. This would increase the amount of
>>> data that needs to be serialized, and some of the configs are quite large,
>>> but would allow me to continue using async IO.
>>>
>>> Any suggestions?
>>>
>>> Thanks!
>>>
>>> -0xe1a
>>>
>>


退订

2021-04-08 Thread 郭天琦
退订

FlinkSQL + pythonUDF问题 flink-1.12.2版本

2021-04-08 Thread 岳江帆
运行一段时间总是会出现这样的问题,用了SQL的高效去重,不确定是不是导致的。
SQL:
CREATE VIEW AllProcessView (
`partition`,
`data`,
proct_time
)
AS
  SELECT
combine(data) AS `partition`,
`data`,
proct_time
  FROM HidsLogTable
  WHERE idx = 'c7’;


INSERT INTO
  IDSEngineData
SELECT T.jsonData
FROM
 (SELECT `data`
FROM (
   SELECT `data`,
 ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY proct_time ASC ) 
AS rownum
   FROM AllProcessView)
WHERE rownum = 1),
LATERAL TABLE(toKafka('c7', `data`)) as T(`jsonData`);


错误
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught 
exception while processing timer.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1205)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1181)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1320)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1309)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) 
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_66]
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: 
java.lang.NullPointerException
... 11 more
Caused by: java.lang.NullPointerException
at 
org.apache.flink.table.runtime.operators.python.table.RowDataPythonTableFunctionOperator.emitResult(RowDataPythonTableFunctionOperator.java:157)
 ~[flink-sql-submit.jar:?]
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250)
 ~[flink-sql-submit.jar:?]
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273)
 ~[flink-sql-submit.jar:?]
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:265)
 ~[flink-sql-submit.jar:?]
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:125)
 ~[flink-sql-submit.jar:?]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1318)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]



退订

2021-04-08 Thread Gauler Tan
退订