Re: Timeout settings for Flink jobs?

2021-10-11 Thread Sharon Xie
Thanks Caizhi, our use case is for testing - before running a production
job, we want to “dry run” it for a while and output the result to a temp
location where we can verify the output. We are running a streaming job but
the use case sounds more like a batch job.

On Mon, Oct 11, 2021 at 7:34 PM Caizhi Weng  wrote:

> Hi!
>
> There is currently no such setting. You need to rely on an external system
> to read the execution time (from Flink's job information, see [1]) and
> cancel the job once it exceeds the time limit.
>
> Could you elaborate more on your use case? Are you running a streaming job
> or a batch job? For streaming jobs it is rare to have a time limit.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid
>
> Sharon Xie  于2021年10月12日周二 上午3:42写道:
>
>> Hi there,
>>
>> We have a use case where we want to terminate a job when a time limit
>> is reached. Is there a Flink setting that we can use for this use case?
>>
>>
>> Thanks,
>> Sharon
>>
>


??????flink-1.14 ???? kafkasource ????watermark????

2021-10-11 Thread JasonLee
Hi


, wm > window.end_time 
,?? wm 
,


Best
JasonLee


??2021??10??12?? 11:26??kcz<573693...@qq.com.INVALID> ??
 
times??+20??StreamExecutionEnvironment
 env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource

回复:flinksql有计划支持mysql catalog吗?

2021-10-11 Thread Roc Marshal
旭晨,你好。
目前这个feature已经在工作中。
欢迎 review / 讨论/改进。 https://github.com/apache/flink/pull/16962

祝好。
Roc.



发自 网易邮箱大师




 回复的原邮件 
| 发件人 | 赵旭晨 |
| 日期 | 2021年10月12日 10:17 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | flinksql有计划支持mysql catalog吗? |
目前flink的jdbccatalog只支持PG,有计划支持mysql的吗?由于公司的元数据存储统一用mysql,不太可能再引进PG,或者反过来问,flink社区到目前为止不支持mysqlcatalog的原因是什么?有什么顾虑么?

flink-1.14 ???? kafkasource ????watermark????

2021-10-11 Thread kcz
 
times??+20??StreamExecutionEnvironment
 env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource

Re: Yarn job not exit when flink job exit

2021-10-11 Thread Yangze Guo
Hi, Jake

In Flink 1.14, we recommend using "-t yarn-per-job"[1] for starting
per-job cluster. Regarding your issue, I could not reproduce it with
the Wordcount example. However, I think this is not the right way for
Flink's SQL client, which might be the root cause of your issue. Would
you like to take a look at [2]?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#per-job-cluster-mode
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/

Best,
Yangze Guo

On Tue, Oct 12, 2021 at 10:52 AM Caizhi Weng  wrote:
>
> Hi!
>
> yarn-cluster is the mode for a yarn session cluster, which means the cluster 
> will remain even after the job is finished. If you want to finish the Flink 
> job as well as the yarn job, use yarn-per-job mode instead.
>
> Jake  于2021年10月9日周六 下午5:53写道:
>>
>> Hi
>>
>> When submit job in yarn-cluster model, flink job finish but yarn job not 
>> exit. What should I do?
>>
>> Submit command:
>>
>> /opt/app/flink/flink-1.14.0/bin/flink run -m yarn-cluster 
>> ./flink-sql-client.jar --file dev.sql


Inconsistent parallelism in web UI when using reactive mode

2021-10-11 Thread 陳昌倬
Hi,

We found that parallelism in web UI are inconsistent when using reactive
mode. As in attachment, in overview page, all parallelism values are 1,
which is not correct one. When clicking operator for detail information,
the parallelism in detail information is the correct one.

Is it possible to fix this inconsistent so that it would not confused
engineer when deploying Flink application.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re:​异步IO算子无法完成checkpoint

2021-10-11 Thread 李一飞
图片上传到附件中了
















在 2021-10-12 10:33:12,"李一飞"  写道:

异步IO算子无法完成checkpoint,帮忙看下是什么原因  




 

Re: Checkpoint size increasing even i enable increasemental checkpoint

2021-10-11 Thread Caizhi Weng
Hi!

Checkpoint sizes are highly related to your job. Incremental checkpointing
will help only when the values in the state are converging (for example a
distinct count aggregation).

If possible, could you provide your user code or explain what jobs are you
running?

Lei Wang  于2021年10月11日周一 下午4:16写道:

>
> [image: image.png]
>
> The  checkpointed data size became bigger and bigger and the node cpu is
> very high when the job is doing checkpointing.
>  But I have enabled incremental checkpointing:  env.setStateBackend(new 
> RocksDBStateBackend(checkpointDir,
> true));
>
> I am using flink-1.11.2 and aliyun oss as checkpoint storage.
>
>
> Any insight on this?
>
> Thanks,
>
> Lei
>
>
>


Re: Yarn job not exit when flink job exit

2021-10-11 Thread Caizhi Weng
Hi!

yarn-cluster is the mode for a yarn session cluster, which means the
cluster will remain even after the job is finished. If you want to finish
the Flink job as well as the yarn job, use yarn-per-job mode instead.

Jake  于2021年10月9日周六 下午5:53写道:

> Hi
>
> When submit job in yarn-cluster model, flink job finish but yarn job not
> exit. What should I do?
>
> Submit command:
>
> /opt/app/flink/flink-1.14.0/bin/flink run -m yarn-cluster
> ./flink-sql-client.jar --file dev.sql


Re: ​异步IO算子无法完成checkpoint

2021-10-11 Thread Caizhi Weng
Hi!

图片无法在邮件中显示,请检查。

李一飞  于2021年10月12日周二 上午10:33写道:

> 异步IO算子无法完成checkpoint,帮忙看下是什么原因
>
>
>
>


Re: Timeout settings for Flink jobs?

2021-10-11 Thread Caizhi Weng
Hi!

There is currently no such setting. You need to rely on an external system
to read the execution time (from Flink's job information, see [1]) and
cancel the job once it exceeds the time limit.

Could you elaborate more on your use case? Are you running a streaming job
or a batch job? For streaming jobs it is rare to have a time limit.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid

Sharon Xie  于2021年10月12日周二 上午3:42写道:

> Hi there,
>
> We have a use case where we want to terminate a job when a time limit
> is reached. Is there a Flink setting that we can use for this use case?
>
>
> Thanks,
> Sharon
>


​异步IO算子无法完成checkpoint

2021-10-11 Thread 李一飞
异步IO算子无法完成checkpoint,帮忙看下是什么原因  

flinksql有计划支持mysql catalog吗?

2021-10-11 Thread 赵旭晨
目前flink的jdbccatalog只支持PG,有计划支持mysql的吗?由于公司的元数据存储统一用mysql,不太可能再引进PG,或者反过来问,flink社区到目前为止不支持mysqlcatalog的原因是什么?有什么顾虑么?

Re: Does the flink sql support checkpoints

2021-10-11 Thread Caizhi Weng
(Forwarding this to the user mailing list as this mail is written in
English)

Hi!

I think problem 1 is the expected behavior. Is this behavior inconvenient
for you? If yes why it is the case?

For problem 2, could you explain in detail how do you run the word count
program and where do you store the counting result? It might be that you're
storing the results in a sink which only updates the result after a
successful checkpoint.

王小宅的蜗居生活  于2021年10月12日周二 上午9:32写道:

> The flink version is v1.13.2
>
> 王小宅的蜗居生活  于2021年10月11日周一 下午9:01写道:
>
> > Use flink sql for real-time calculation (deployment mode: on yarn). To
> use
> > the checkpoint, you need to configure the following in the
> flink-conf.yaml:
> >
> >
> > state.backend: filesystem
> >
> > state.checkpoints.dir: hdfs:///flink/flink-checkpoints
> >
> > state.savepoints.dir: hdfs:///flink/flink-savepoints
> >
> > state.checkpoints.num-retained: 10
> >
> >
> > There are two problems:
> >
> > 1.checkpoint generates a folder at the initial stage in the HDFS.
> >
> > 2. The savepoint cannot record the data status.
> >
> > For example: The simple wordcount cannot record the accumulated value.
> >
>


Re: Does the flink sql support checkpoints

2021-10-11 Thread Caizhi Weng
(Forwarding this to the user mailing list as this mail is written in
English)

Hi!

I think problem 1 is the expected behavior. Is this behavior inconvenient
for you? If yes why it is the case?

For problem 2, could you explain in detail how do you run the word count
program and where do you store the counting result? It might be that you're
storing the results in a sink which only updates the result after a
successful checkpoint.

王小宅的蜗居生活  于2021年10月12日周二 上午9:32写道:

> The flink version is v1.13.2
>
> 王小宅的蜗居生活  于2021年10月11日周一 下午9:01写道:
>
> > Use flink sql for real-time calculation (deployment mode: on yarn). To
> use
> > the checkpoint, you need to configure the following in the
> flink-conf.yaml:
> >
> >
> > state.backend: filesystem
> >
> > state.checkpoints.dir: hdfs:///flink/flink-checkpoints
> >
> > state.savepoints.dir: hdfs:///flink/flink-savepoints
> >
> > state.checkpoints.num-retained: 10
> >
> >
> > There are two problems:
> >
> > 1.checkpoint generates a folder at the initial stage in the HDFS.
> >
> > 2. The savepoint cannot record the data status.
> >
> > For example: The simple wordcount cannot record the accumulated value.
> >
>


Re: PyFlink JDBC SQL Connector for SQL Server

2021-10-11 Thread Dian Fu
Hi,

Currently it only supports derby, mysql, postgresql dialect. The dialect
'sqlserver' is still not supported. There is a ticket
https://issues.apache.org/jira/browse/FLINK-14101 for this.

Regards,
Dian

On Mon, Oct 11, 2021 at 9:43 PM Schmid Christian 
wrote:

> Hi all
>
>
>
> According to the official documentation (Table API / JDBC SQL Connector
> v.1.14.0) "the JDBC connector allows reading data from and writing data
> into *any relational databases* with a JDBC driver".
>
> At the moment we are using SQL Server in conjunction with Flink and Java,
> which works perfectly fine. Now we try to fetch Data from a Kafka-Topic and
> write it to a SQL Server sink using *PyFlink*.
>
> We succeeded in fetching the data from the kafka topic, but were not able
> to establish a connection to SQL Server.
>
>
>
> Our code looks as follows:
>
>
>
> import os
>
>
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.table import
>  StreamTableEnvironment, DataTypes, EnvironmentSettings, CsvTableSink, 
> WriteMode
>
> from pyflink.table.descriptors import Schema, Kafka, Json
>
>
>
> def main():
>
> # Create streaming environment
>
> env = StreamExecutionEnvironment.get_execution_environment()
>
>
>
> settings = EnvironmentSettings.new_instance()\
>
> .in_streaming_mode()\
>
> .use_blink_planner()\
>
> .build()
>
>
>
> # create table environment
>
> tbl_env = StreamTableEnvironment.create(
> stream_execution_environment=env, environment_settings=settings)
>
>
>
> # add kafka connector dependency
>
> kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__
> )), 'flink-sql-connector-kafka_2.11-1.13.0.jar')
>
>
>
> # add jdbc connector dependency
>
> jdbc_jar = os.path.join(os.path.abspath(os.path.dirname(__file__
> )),'flink-connector-jdbc_2.11-1.13.2.jar')
>
> mssql_jar = os.path.join(os.path.abspath(os.path.dirname(__file__
> )),'mssql-jdbc-8.2.2.jre8.jar')
>
>
>
> tbl_env.get_config()\
>
> .get_configuration().set_string("parallelism.default", "1"
> )\
>
> .set_string("pipeline.jars", "file:///{};file:///{}"
> .format(kafka_jar, jdbc_jar))\
>
> .set_string("pipeline.classpaths", "file:///{}"
> .format(mssql_jar))
>
>
>
> kafka_table_sql = """
>
> CREATE TABLE kafka (
>
> [..] VARCHAR,
>
> data ROW(
>
> [..] ROW(
>
> [..] VARCHAR,
>
> [..] VARCHAR
>
> ))
>
> ) WITH (
>
> 'connector' = 'kafka',
>
> 'property-version' = 'universal',
>
> 'properties.bootstrap.servers' = '[..]',
>
> 'topic' = '[..]',
>
> 'scan.startup.mode' = 'earliest-offset',
>
> 'properties.security.protocol' = 'SSL',
>
> 'properties.ssl.endpoint.identification.algorithm' = '',
>
> 'properties.ssl.truststore.location' = '[..]',
>
> 'properties.ssl.truststore.password' = '[..]',
>
> 'properties.ssl.keystore.type' = 'JKS',
>
> 'properties.ssl.keystore.location' = '[..]',
>
> 'properties.ssl.keystore.password' = [..],
>
> 'properties.ssl.key.password' = [..],
>
> 'properties.group.id' = '[..]',
>
> 'format' = 'json'
>
> )
>
> """
>
>
>
>sqlserver_table_sql = """
>
> CREATE TABLE sqltest (
>
> [..] VARCHAR,
>
> [..] VARCHAR
>
> ) WITH (
>
> 'connector' = 'jdbc',
>
> 'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
>
> 'url' = 'jdbc:sqlserver://db-server/database-name',
>
> 'username' = '[..]',
>
> 'password' = '[..],
>
> 'table-name' = 'dbo.tablename'
>
> )
>
> """
>
>
>
> # create source table (kafka)
>
> tbl_env.execute_sql(kafka_table_sql)
>
>
>
> # create sink table (sql server)
>
> tbl_env.execute_sql(sqlserver_table_sql)
>
>
>
> # copy data from source to sink
>
> tbl_env.execute_sql(
> "INSERT INTO sqltest SELECT [..], [..] FROM kafka").wait()
>
>
>
> if __name__ == '__main__':
>
> main()
>
>
>
>
>
> Which lead to an exception (java.lang.IllegalStateException: *Cannot
> handle such jdbc url* ..):
>
>
>
> Traceback (most recent call last):
>
>   File "c:/projects/flink/kafka_csv_jdbc.py", line 122, in 
>
> main()
>
>   File "c:/projects/flink/kafka_csv_jdbc.py", line 119, in main
>
> tbl_env.execute_sql("[..]").wait()
>
>   File
> "C:\projects\flink\flink-evn\lib\site-packages\pyflink\table\table_environment.py",
> line 804, in execute_sql
>
> return TableResult(self._j_tenv.executeSql(stmt))
>
>   File
> "C:\projects\flink\flink-evn\lib\site-packages\py4j\java_gateway.py", line
> 1286, in __call__
>
> answer, 

Re: Does the flink sql support checkpoints

2021-10-11 Thread 王小宅的蜗居生活
The flink version is v1.13.2

王小宅的蜗居生活  于2021年10月11日周一 下午9:01写道:

> Use flink sql for real-time calculation (deployment mode: on yarn). To use
> the checkpoint, you need to configure the following in the flink-conf.yaml:
>
>
> state.backend: filesystem
>
> state.checkpoints.dir: hdfs:///flink/flink-checkpoints
>
> state.savepoints.dir: hdfs:///flink/flink-savepoints
>
> state.checkpoints.num-retained: 10
>
>
> There are two problems:
>
> 1.checkpoint generates a folder at the initial stage in the HDFS.
>
> 2. The savepoint cannot record the data status.
>
> For example: The simple wordcount cannot record the accumulated value.
>


Does the flink sql support checkpoints

2021-10-11 Thread 王小宅的蜗居生活
Use flink sql for real-time calculation (deployment mode: on yarn). To use
the checkpoint, you need to configure the following in the flink-conf.yaml:


state.backend: filesystem

state.checkpoints.dir: hdfs:///flink/flink-checkpoints

state.savepoints.dir: hdfs:///flink/flink-savepoints

state.checkpoints.num-retained: 10


There are two problems:

1.checkpoint generates a folder at the initial stage in the HDFS.

2. The savepoint cannot record the data status.

For example: The simple wordcount cannot record the accumulated value.


Flink 1.11 loses track of event timestamps and watermarks after process function

2021-10-11 Thread Ahmad Alkilani
Flink 1.11
I have a simple Flink application that reads from Kafka, uses event
timestamps, assigns timestamps and watermarks and then key's by a field and
uses a KeyedProcessFunciton.

The keyed process function outputs events from with the `processElement`
method using `out.collect`. No timers are used to collect or output any
elements (or do anything for that matter).

I also have a simple print statement that shows event time and waterMark
within the process function.

if (waterMark <= 0)
  println(
s"""
   |eventTimestamp:$eventTimestamp
   |waterMark: $waterMark
   |""".stripMargin)


If the process function simply does nothing with the incoming records,
i.e., does not emit any records/data as a result of an input element, then
you'll see the Water Mark start with -Max Long and then progress just fine
as expected. If I add `out.collect()` then the watermark stops
progressing and the above print statement prints for every record.

The environment has
`setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` set.

The source start out something like this:

someKafkaSource.flatMap(_.someTransofmrationToEventType.filter(_.something
!= 0))
  .assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(10))
.withIdleness(Duration.ofSeconds(30))
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
  override def extractTimestamp(element: Event, recordTimestamp:
Long): Long = {
if (element.eventTimeUTCMillis > 0) element.eventTimeUTCMillis
else recordTimestamp
  }
})

The sink is a custom Rich Sink implementation:
 resultStream.addSink(new CustomSink()}

I recall seeing a thread somewhere indicating this could be a Flink bug but
I can't seem to track it down again.
Happy to provide more information. For what it's worth, the
KeyedProcessFunction used to be a GlobalWindow with a custom Trigger and
Evictor but has since been replaced in an attempt to solve the watermark
issue with no success so far.

Do I have to use assignTimestampAndWatermarks again after the
KeyedProcessFunction?

Full job flow for completeness:

Kafka -> Flink Kafka source -> flatMap (map & filter) ->
assignTimestampsAndWaterMarks -> map Function -> Key By -> Keyed Process
Function -> Async IO -> Custom Sink

Much obliged.


Timeout settings for Flink jobs?

2021-10-11 Thread Sharon Xie
Hi there,

We have a use case where we want to terminate a job when a time limit
is reached. Is there a Flink setting that we can use for this use case?


Thanks,
Sharon


Re: Empty Kafka topics and watermarks

2021-10-11 Thread Piotr Nowojski
Great, thanks!

pon., 11 paź 2021 o 17:24 James Sandys-Lumsdaine 
napisał(a):

> Ah thanks for the feedback. I can work around for now but will upgrade as
> soon as I can to the latest version.
>
> Thanks very much,
>
> James.
> --
> *From:* Piotr Nowojski 
> *Sent:* 08 October 2021 13:17
> *To:* James Sandys-Lumsdaine 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Empty Kafka topics and watermarks
>
> Hi James,
>
> I believe you have encountered a bug that we have already fixed [1]. The
> small problem is that in order to fix this bug, we had to change some
> `@PublicEvolving` interfaces and thus we were not able to backport this fix
> to earlier minor releases. As such, this is only fixed starting from 1.14.x.
>
> Best,
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18934
>
> pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine 
> napisał(a):
>
> Hi everyone,
>
> I'm putting together a Flink workflow that needs to merge historic data
> from a custom JDBC source with a Kafka flow (for the realtime data). I have
> successfully written the custom JDBC source that emits a watermark for the
> last event time after all the DB data has been emitted but now I face a
> problem when joining with data from the Kafka stream.
>
> I register a timer in my KeyedCoProcessFunction joining the DB stream
> with live Kafka stream so I can emit all the "batch" data from the DB in
> one go when completely read up to the watermark but the timer never fires
> as the Kafka stream is empty and therefore doesn't emit a watermark. My
> Kafka stream will allowed to be empty since all the data will have been
> retrieved from the DB call so I only expect new events to appear over
> Kafka. Note that if I replace the Kafka input with a simple
> env.fromCollection(...) empty list then the timer triggers fine as Flink
> seems to detect it doesn't need to wait for any input from stream 2. So it
> seems to be something related to the Kafka stream status that is preventing
> the watermark from advancing in the KeyedCoProcessFunction.
>
> I have tried configuring the Kafka stream timestamp and watermark
> strategies to so the source is marked as idle after 10 seconds but still it
> seems the watermark in the join operator combining these 2 streams is not
> advancing. (See example code below).
>
> Maybe this is my bad understanding but I thought if an input stream into a
> KeyedCoProcessFunction is idle then it wouldn't be considered by the
> operator for forwarding the watermark i.e. it would forward the non-idle
> input stream's watermark and not do a min(stream1WM, stream2WM). With the
> below example I never see the onTimer fire and the only effect the
> withIdleness() strategy has is to stop the print statements in
> onPeriodicEmit() happening after 5 seconds (env periodic emit is set to the
> default 200ms so I see 25 rows before it stops).
>
> The only way I can get my KeyedCoProcessFunction timer to fire is to force
> an emit of the watermark I want in the onPeriodicEmit() after x numbers of
> attempts to advance an initial watermark i.e. if onPeriodicEmit() is called
> 100 times and the "latestWatermark" is still Long.MIN_VALUE then I emit the
> watermark I want so the join can progress. This seems like a nasty hack to
> me but perhaps something like this is actually necessary?
>
> I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any
> pointers would be appreciated.
>
> Thanks in advance,
>
> James.
>
> FlinkKafkaConsumer positionsFlinkKafkaConsumer = new
> FlinkKafkaConsumer<>("poc.positions",
> ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class,
> SchemaRegistryURL), kafkaConsumerProperties);
>
> positionsFlinkKafkaConsumer.setStartFromEarliest();
>
> positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks(
>
>new WatermarkStrategy() {
>
>   @Override
>
>   public TimestampAssigner
> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>
> return (event, recordTimestamp) -> {
>
> return event.getPhysicalFrom();
>
> };
>
> }
>
>
>
>   @Override
>
>   public WatermarkGenerator
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>
> return new WatermarkGenerator() {
>
> public long latestWatermark = Long.MIN_VALUE;
>
>
>
> @Override
>
> public void onEvent(Position event, long
> timestamp, WatermarkOutput output) {
>
> long eventWatermark =
> event.getPhysicalFrom();
>
> if (eventWatermark > latestWatermark)
>
> latestWatermark = eventWatermark;
>
> }
>
>
>
> @Override
>
> public void 

Re: Empty Kafka topics and watermarks

2021-10-11 Thread James Sandys-Lumsdaine
Ah thanks for the feedback. I can work around for now but will upgrade as soon 
as I can to the latest version.

Thanks very much,

James.

From: Piotr Nowojski 
Sent: 08 October 2021 13:17
To: James Sandys-Lumsdaine 
Cc: user@flink.apache.org 
Subject: Re: Empty Kafka topics and watermarks

Hi James,

I believe you have encountered a bug that we have already fixed [1]. The small 
problem is that in order to fix this bug, we had to change some 
`@PublicEvolving` interfaces and thus we were not able to backport this fix to 
earlier minor releases. As such, this is only fixed starting from 1.14.x.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18934

pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> napisał(a):
Hi everyone,

I'm putting together a Flink workflow that needs to merge historic data from a 
custom JDBC source with a Kafka flow (for the realtime data). I have 
successfully written the custom JDBC source that emits a watermark for the last 
event time after all the DB data has been emitted but now I face a problem when 
joining with data from the Kafka stream.

I register a timer in my KeyedCoProcessFunction joining the DB stream with live 
Kafka stream so I can emit all the "batch" data from the DB in one go when 
completely read up to the watermark but the timer never fires as the Kafka 
stream is empty and therefore doesn't emit a watermark. My Kafka stream will 
allowed to be empty since all the data will have been retrieved from the DB 
call so I only expect new events to appear over Kafka. Note that if I replace 
the Kafka input with a simple env.fromCollection(...) empty list then the timer 
triggers fine as Flink seems to detect it doesn't need to wait for any input 
from stream 2. So it seems to be something related to the Kafka stream status 
that is preventing the watermark from advancing in the KeyedCoProcessFunction.

I have tried configuring the Kafka stream timestamp and watermark strategies to 
so the source is marked as idle after 10 seconds but still it seems the 
watermark in the join operator combining these 2 streams is not advancing. (See 
example code below).

Maybe this is my bad understanding but I thought if an input stream into a 
KeyedCoProcessFunction is idle then it wouldn't be considered by the operator 
for forwarding the watermark i.e. it would forward the non-idle input stream's 
watermark and not do a min(stream1WM, stream2WM). With the below example I 
never see the onTimer fire and the only effect the withIdleness() strategy has 
is to stop the print statements in onPeriodicEmit() happening after 5 seconds 
(env periodic emit is set to the default 200ms so I see 25 rows before it 
stops).

The only way I can get my KeyedCoProcessFunction timer to fire is to force an 
emit of the watermark I want in the onPeriodicEmit() after x numbers of 
attempts to advance an initial watermark i.e. if onPeriodicEmit() is called 100 
times and the "latestWatermark" is still Long.MIN_VALUE then I emit the 
watermark I want so the join can progress. This seems like a nasty hack to me 
but perhaps something like this is actually necessary?

I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any pointers 
would be appreciated.

Thanks in advance,

James.


FlinkKafkaConsumer positionsFlinkKafkaConsumer = new 
FlinkKafkaConsumer<>("poc.positions", 
ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class, 
SchemaRegistryURL), kafkaConsumerProperties);

positionsFlinkKafkaConsumer.setStartFromEarliest();

positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks(

   new WatermarkStrategy() {

  @Override

  public TimestampAssigner 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {

return (event, recordTimestamp) -> {

return event.getPhysicalFrom();

};

}



  @Override

  public WatermarkGenerator 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

return new WatermarkGenerator() {

public long latestWatermark = Long.MIN_VALUE;



@Override

public void onEvent(Position event, long timestamp, 
WatermarkOutput output) {

long eventWatermark = event.getPhysicalFrom();

if (eventWatermark > latestWatermark)

latestWatermark = eventWatermark;

}



@Override

public void onPeriodicEmit(WatermarkOutput output) {

System.out.printf("Emitting watermark %d\n", 
latestWatermark);

output.emitWatermark(new 
Watermark(latestWatermark));

   

Re: Reset of transient variables in state to default values.

2021-10-11 Thread Alex Drobinsky
It would be difficult to provide even a semblance of the complete product ,
however I could try to provide enough details to reproduce the problem.
Standard source would do:

DataStream stream = env.addSource(
new FlinkKafkaConsumer<>(topic, new
AbstractDeserializationSchema() {
@Override
public byte[] deserialize(byte[] bytes) throws IOException {
return bytes;
}
}, properties)).name(topic);


The operator body something like:

public class MultiStorePacketFunction extends
KeyedProcessFunction
implements Serializable {
   private transient ValueState state;

   @Override
   public void processElement(SplitterToMultiStore packet, Context
ctx, Collector out) throws Exception {
  if (packet.hasPackets()) {
 storedPackets.inc(packet.getPackets().getPacketsCount());
  }

  MultiStorePacketState so = state.value();
  if (process(packet, out, so, ctx)) {
 state.update(null);
 state.clear();
  } else {
 state.update(so);
  }
   }

public String generateNextFilename(String sessionKey, int partNumber) {
  String path = DirectoryService.getInstance().bookDirectory();
  return path + File.separator + sessionKey + "-" + partNumber + ".pcap";
   }

   private void storeContent(Collector collector,
MultiStorePacketState state, SplitterToMultiStore packets) throws
Exception {
  assert (packets != null);
  assert (packets.hasPackets());

  if ( state.currentFile == null) {
 openFile(collector, state, packets);
  }

  Utils.continueWriteToPcap(state.currentFile,
packets.getPackets().getPacketsList());
  state.fileOffset = state.currentFile.length();

  tryToCloseFile(collector, state);
   }

   static public String extractExportedFileName(String fileName) {
  String path[] = fileName.split("/");
  return path[path.length - 2] + "/" + path[path.length - 1];
   }

   private void openFile(Collector collector,
MultiStorePacketState state, SplitterToMultiStore packets) throws
Exception {
  state.fileIsOpened = true;
  state.fileName = generateNextFilename(state.sessionKey, state.partNumber);
  state.exportedFileName = extractExportedFileName(state.fileName);

// -> Here RandomAccessFile created
  state.currentFile = Utils.startWriteToPcap(state.fileName,
packets.getPackets().getPacketsList());
  state.fileOffset = state.currentFile.length();
  state.partNumber++;
   }

   private void tryToCloseFile(Collector collector,
MultiStorePacketState state) throws IOException {
  if (state.currentFile.length() <
StorePacketConfigurationParameters.partSizeLimit) {
 return;
  }
  closeFile(collector, state);
   }

   private void closeFile(Collector collector,
MultiStorePacketState state) throws IOException {
  state.currentFile.close();
  state.currentFile = null;
  state.fileIsOpened = false;
  ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
  outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
  outputBuilder.setSessionType(SessionType.Multi);
  outputBuilder.setSessionKey(state.sessionKey);
  var classifierOutput = outputBuilder.build();
  state.sessionMetadata.add(classifierOutput);
  collector.collect(classifierOutput);
   }

public boolean process(SplitterToMultiStore packet,
Collector collector, MultiStorePacketState so,
Context context) throws Exception {

  // First message
  if (packet.hasClassificationResult()) {
 sendClassificationResult(packet, collector, so);
 return false;
  }

  // Last message
  if (packet.hasSessionClosure()) {
 if (so.isCoverageIncorrect) {
return true;
 }
 handleSessionClosure(packet, collector, so, context);
 return true;
  }

  if (so.isCoverageIncorrect) {
 return false;
  }
  storeContent(collector, so, packet);

  // File could be already close e.g. it reached expected size.
  if (so.currentFile != null) {
 setupTimer(so, context.timerService());
  }
  return false;
   }

   private void handleSessionClosure(SplitterToMultiStore packet,
Collector collector, MultiStorePacketState so,
Context context) throws IOException {
  if (so.currentFile != null) {
 closeFile(collector, so);
  }
  ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
  outputBuilder.setSessionKey(packet.getSessionKey());
  outputBuilder.setSessionType(packet.getSessionType());
  var messageBuilder = outputBuilder.getLastBuilder();
  messageBuilder.addAllAggregatedSession(so.sessionMetadata);
  outputBuilder.setLast(messageBuilder.build());
  var output = outputBuilder.build();
  collector.collect(output);
  context.timerService().deleteProcessingTimeTimer(so.timerValue);
  state.clear();
   }

   private void 

Re: Reset of transient variables in state to default values.

2021-10-11 Thread JING ZHANG
Hi Alex,
It is a little weird.
Would you please provide the program which could reproduce the problem,
including DataStream job code and related classes code. I need some debug
to find out the reason.

Best,
JING ZHANG


Alex Drobinsky  于2021年10月11日周一 下午5:50写道:

> Hi Jing Zhang,
>
> I'm using the FileSystem backend. I also implemented ReadObject function
> to support proper restart procedure:
>
> private void readObject(ObjectInputStream ois)
> throws ClassNotFoundException, IOException {
> ois.defaultReadObject();
> logger.info("Deserialized MultiStorePacketState: " + this.toString());
>
> // No need to do anything in case of empty file
> if (fileName.isEmpty()) {
> return;
> }
> currentFile = new RandomAccessFile(fileName,"rw");
> currentFile.seek(fileOffset);
> }
>
> However, according to logs this function wasn't called.
>
> Btw, it could be beneficial to add this kind of State object e.g. FileState 
> which will encapsulate serialization / deserialization for RandomAccessFile 
> although the concept itself is a bit contradictory to regular state.
>
> Currently, I implemented and tested a workaround via addition of the boolean 
> variable isFileOpened, however it's awkward because I need to check the state 
> of the transient variable every time I use state.value().
>
> So should it be expected that transient variables in state would be resetted 
> to default values ?
>
>
> пн, 11 окт. 2021 г. в 12:33, JING ZHANG :
>
>> Hi, Alex
>> What state backend do you choose?
>> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword
>> may not have effect because MemoryStateBackend does not serialize state for
>> regular read/write accesses but keeps it as objects on the heap.
>> If you choose RocksDBStateBackend, I thought it was expected behavior
>> because RocksDBStateBackend stores all state as byte arrays in embedded
>> RocksDB instances. Therefore, it de/serializes the state of a key for every
>> read/write access.  CurrentFile is null because the transient variable
>> would not be serialized by default.
>>
>> Best,
>> JING ZHANG
>>
>>
>> Alex Drobinsky  于2021年10月11日周一 下午4:33写道:
>>
>>> Dear flink community,
>>>
>>> I have following state class ( irrelevant fields removed )
>>> public class MultiStorePacketState implements Serializable {
>>>
>>> public transient RandomAccessFile currentFile = null;
>>> public long timerValue;
>>> public String fileName;
>>> public String exportedFileName;
>>> public String sessionKey;
>>> public long fileOffset = 0;
>>>
>>> }
>>>
>>> Once in a while, currentFile became *nullified, *this happens after I
>>> extract state via
>>>
>>> MultiStorePacketState so = state.value();
>>>
>>> The frequency of this behaviour is similar to checkpoint interval ( 
>>> checkpoint interval defined as 5 seconds and first occurence of this 
>>> problem is also 5 seconds), otherwise I don't have any clues to a possible 
>>> explanation.
>>>
>>> Is it an expected side effect of checkpoint procedure ?
>>>
>>> Best regards,
>>> Alex
>>>
>>>


Re: Unsubscribe

2021-10-11 Thread JING ZHANG
Hi,

To unsubscribe emails from Flink user mail list, send an email to
user-unsubscr...@flink.apache.org
To unsubscribe emails from Flink dev mail list, send an email to
dev-unsubscr...@flink.apache.org

To unsubscribe emails from Flink user -zh mail list, send an email to
user-zh-unsubscr...@flink.apache.org

For more information, please go to [1].

[1] https://flink.apache.org/community.html#mailing-lists

Best,
JING ZHANG


Jesús Vásquez  于2021年10月11日周一 下午9:40写道:

> Hello i want to unsubscribe
>


PyFlink JDBC SQL Connector for SQL Server

2021-10-11 Thread Schmid Christian
Hi all

According to the official documentation (Table API / JDBC SQL Connector 
v.1.14.0) "the JDBC connector allows reading data from and writing data into 
any relational databases with a JDBC driver".
At the moment we are using SQL Server in conjunction with Flink and Java, which 
works perfectly fine. Now we try to fetch Data from a Kafka-Topic and write it 
to a SQL Server sink using PyFlink.
We succeeded in fetching the data from the kafka topic, but were not able to 
establish a connection to SQL Server.

Our code looks as follows:

import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes, 
EnvironmentSettings, CsvTableSink, WriteMode
from pyflink.table.descriptors import Schema, Kafka, Json

def main():
# Create streaming environment
env = StreamExecutionEnvironment.get_execution_environment()

settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.use_blink_planner()\
.build()

# create table environment
tbl_env = 
StreamTableEnvironment.create(stream_execution_environment=env, 
environment_settings=settings)

# add kafka connector dependency
kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 
'flink-sql-connector-kafka_2.11-1.13.0.jar')

# add jdbc connector dependency
jdbc_jar = 
os.path.join(os.path.abspath(os.path.dirname(__file__)),'flink-connector-jdbc_2.11-1.13.2.jar')
mssql_jar = 
os.path.join(os.path.abspath(os.path.dirname(__file__)),'mssql-jdbc-8.2.2.jre8.jar')

tbl_env.get_config()\
.get_configuration().set_string("parallelism.default", "1")\
.set_string("pipeline.jars", 
"file:///{};file:///{}".format(kafka_jar, jdbc_jar))\
.set_string("pipeline.classpaths", 
"file:///{}".format(mssql_jar))

kafka_table_sql = """
CREATE TABLE kafka (
[..] VARCHAR,
data ROW(
[..] ROW(
[..] VARCHAR,
[..] VARCHAR
))
) WITH (
'connector' = 'kafka',
'property-version' = 'universal',
'properties.bootstrap.servers' = '[..]',
'topic' = '[..]',
'scan.startup.mode' = 'earliest-offset',
'properties.security.protocol' = 'SSL',
'properties.ssl.endpoint.identification.algorithm' = '',
'properties.ssl.truststore.location' = '[..]',
'properties.ssl.truststore.password' = '[..]',
'properties.ssl.keystore.type' = 'JKS',
'properties.ssl.keystore.location' = '[..]',
'properties.ssl.keystore.password' = [..],
'properties.ssl.key.password' = [..],
'properties.group.id' = '[..]',
'format' = 'json'
)
"""

   sqlserver_table_sql = """
CREATE TABLE sqltest (
[..] VARCHAR,
[..] VARCHAR
) WITH (
'connector' = 'jdbc',
'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
'url' = 'jdbc:sqlserver://db-server/database-name',
'username' = '[..]',
'password' = '[..],
'table-name' = 'dbo.tablename'
)
"""

# create source table (kafka)
tbl_env.execute_sql(kafka_table_sql)

# create sink table (sql server)
tbl_env.execute_sql(sqlserver_table_sql)

# copy data from source to sink
tbl_env.execute_sql("INSERT INTO sqltest SELECT [..], [..] FROM 
kafka").wait()

if __name__ == '__main__':
main()


Which lead to an exception (java.lang.IllegalStateException: Cannot handle such 
jdbc url ..):

Traceback (most recent call last):
  File "c:/projects/flink/kafka_csv_jdbc.py", line 122, in 
main()
  File "c:/projects/flink/kafka_csv_jdbc.py", line 119, in main
tbl_env.execute_sql("[..]").wait()
  File 
"C:\projects\flink\flink-evn\lib\site-packages\pyflink\table\table_environment.py",
 line 804, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
  File "C:\projects\flink\flink-evn\lib\site-packages\py4j\java_gateway.py", 
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\projects\flink\flink-evn\lib\site-packages\pyflink\util\exceptions.py", 
line 146, in deco
return f(*a, **kw)
  File "C:\projects\flink\flink-evn\lib\site-packages\py4j\protocol.py", line 
328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a sink for 
writing table 'default_catalog.default_database.sqltest'.

Table options are:

'connector'='jdbc'
'driver'='com.microsoft.sqlserver.jdbc.SQLServerDriver'
'password'='[..]'
'table-name'='[..]'
'url'='jdbc:sqlserver:// [..]'
'username'='[..]'

Unsubscribe

2021-10-11 Thread Jesús Vásquez
Hello i want to unsubscribe


offset of TumblingEventTimeWindows

2021-10-11 Thread 杨浩
As in China (UTC+08:00),we should use Time.hours(-8) as offset when state day's 
data, 


// daily tumbling event-time windows offset by -8 hours.
input.keyBy().window(TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8))).()




shall we also set Time.hours(-8) as offset for minute's state ?


input.keyBy().window(TumblingEventTimeWindows.of(Time.seconds(5))).()
input.keyBy().window(TumblingEventTimeWindows.of(Time.seconds(5),Time.hours(-8)))
.()

offset of TumblingEventTimeWindows

2021-10-11 Thread 杨浩
As in China (UTC+08:00),we should use Time.hours(-8) as offset when state day's 
data, 


// daily tumbling event-time windows offset by -8 hours.
input.keyBy().window(TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8))).()




shall we also set Time.hours(-8) as offset for minute's state ?


input.keyBy().window(TumblingEventTimeWindows.of(Time.seconds(5))).()
input.keyBy().window(TumblingEventTimeWindows.of(Time.seconds(5),Time.hours(-8)))
.()

退订

2021-10-11 Thread Cheney Jin
退订

Re: 回复:Flink的api停止方式

2021-10-11 Thread 刘建刚
可以尝试以下两种方法:
1、达到停止条件时,通过一定方式通知外界工具,外界工具来帮忙停止作业。
2、现在RichFunction里可以拿到jobId,但是拿不到applicationId,可以看看能否修改代码获取它,比如通过环境变量。然后再调用restful
接口停止作业。

lei-tian  于2021年10月11日周一 上午9:11写道:

> 因为要在代码里面判断是否停止的条件,停止的时候还是要在代码里面停止吧。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-10-11 09:06:17,"995626544" <995626...@qq.com.INVALID> 写道:
> >hi:
> >这个借助外部脚本应该比较容易实现吧。stop后再启动。
> >
> >
> >
> >---原始邮件---
> >发件人: "lei-tian" >发送时间: 2021年10月11日(周一) 上午9:02
> >收件人: "user-zh" >主题: Flink的api停止方式
> >
> >
> >您好:
> >
> 我这边现在有个flink读hbase的程序需要每天在固定的时间段内调用接口,然后如果在指定时间以外或者处理的数据量达到一个阈值的时候停止,在第二天的重复第一天的操作。现在有个问题,就是如何在代码里面可以像UI界面那样将jobcancel掉,而且第二天可以接着第一天的处理进度接着处理剩下的数据。有个savepoint的方案,需要jobid和applicationid但是目前好像在代码里面获取不到,有人有解决思路吗?
>


Re: After Receiving Kafka Data ( getting this error) for s3 bucket access

2021-10-11 Thread Dhiru
 
sorry , there was issue with path of s3 bucket, Got this fixed ..
Sorry for troubling you guys On Sunday, October 10, 2021, 12:33:16 PM EDT, 
Dhiru  wrote:  
 
 We have configured s3 bucket s3a://msc-sandbox-test-bucketI am not sure how 
come some extra characters get added for a bucket?
java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path 
in absolute URI: 
s3a://msc-sandbox-test-bucket3TjIvqnUkP1YBpoy.3MxSF/3TjIwLWrI71fbMZmGYK7rV/31-07-2017
at org.apache.flink.core.fs.Path.initialize(Path.java:230)at 
org.apache.flink.core.fs.Path.(Path.java:139)at 
org.apache.flink.core.fs.Path.(Path.java:93)at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.assembleBucketPath(Buckets.java:353)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.getOrCreateBucketForBucketId(Buckets.java:319)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:304)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:104)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
   at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)Caused
 by: java.net.URISyntaxException: Relative path in absolute URI: 
s3a://msc-sandbox-test-bucket3TjIvqnUkP1YBpoy.3MxSF/3TjIwLWrI71fbMZmGYK7rV/31-07-2017
at java.base/java.net.URI.checkPath(Unknown Source)at 
java.base/java.net.URI.(Unknown Source)at 
org.apache.flink.core.fs.Path.initialize(Path.java:228)... 36 more

  

退订

2021-10-11 Thread Zhanshun Zou
退订


Re: Reset of transient variables in state to default values.

2021-10-11 Thread Alex Drobinsky
Hi Jing Zhang,

I'm using the FileSystem backend. I also implemented ReadObject function to
support proper restart procedure:

private void readObject(ObjectInputStream ois)
throws ClassNotFoundException, IOException {
ois.defaultReadObject();
logger.info("Deserialized MultiStorePacketState: " + this.toString());

// No need to do anything in case of empty file
if (fileName.isEmpty()) {
return;
}
currentFile = new RandomAccessFile(fileName,"rw");
currentFile.seek(fileOffset);
}

However, according to logs this function wasn't called.

Btw, it could be beneficial to add this kind of State object e.g.
FileState which will encapsulate serialization / deserialization for
RandomAccessFile although the concept itself is a bit contradictory to
regular state.

Currently, I implemented and tested a workaround via addition of the
boolean variable isFileOpened, however it's awkward because I need to
check the state of the transient variable every time I use
state.value().

So should it be expected that transient variables in state would be
resetted to default values ?


пн, 11 окт. 2021 г. в 12:33, JING ZHANG :

> Hi, Alex
> What state backend do you choose?
> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword
> may not have effect because MemoryStateBackend does not serialize state for
> regular read/write accesses but keeps it as objects on the heap.
> If you choose RocksDBStateBackend, I thought it was expected behavior
> because RocksDBStateBackend stores all state as byte arrays in embedded
> RocksDB instances. Therefore, it de/serializes the state of a key for every
> read/write access.  CurrentFile is null because the transient variable
> would not be serialized by default.
>
> Best,
> JING ZHANG
>
>
> Alex Drobinsky  于2021年10月11日周一 下午4:33写道:
>
>> Dear flink community,
>>
>> I have following state class ( irrelevant fields removed )
>> public class MultiStorePacketState implements Serializable {
>>
>> public transient RandomAccessFile currentFile = null;
>> public long timerValue;
>> public String fileName;
>> public String exportedFileName;
>> public String sessionKey;
>> public long fileOffset = 0;
>>
>> }
>>
>> Once in a while, currentFile became *nullified, *this happens after I
>> extract state via
>>
>> MultiStorePacketState so = state.value();
>>
>> The frequency of this behaviour is similar to checkpoint interval ( 
>> checkpoint interval defined as 5 seconds and first occurence of this problem 
>> is also 5 seconds), otherwise I don't have any clues to a possible 
>> explanation.
>>
>> Is it an expected side effect of checkpoint procedure ?
>>
>> Best regards,
>> Alex
>>
>>


Re: Reset of transient variables in state to default values.

2021-10-11 Thread JING ZHANG
Hi, Alex
What state backend do you choose?
If you choose MemoryStateBackend or FsStateBackend, `transient` keyword may
not have effect because MemoryStateBackend does not serialize state for
regular read/write accesses but keeps it as objects on the heap.
If you choose RocksDBStateBackend, I thought it was expected behavior
because RocksDBStateBackend stores all state as byte arrays in embedded
RocksDB instances. Therefore, it de/serializes the state of a key for every
read/write access.  CurrentFile is null because the transient variable
would not be serialized by default.

Best,
JING ZHANG


Alex Drobinsky  于2021年10月11日周一 下午4:33写道:

> Dear flink community,
>
> I have following state class ( irrelevant fields removed )
> public class MultiStorePacketState implements Serializable {
>
> public transient RandomAccessFile currentFile = null;
> public long timerValue;
> public String fileName;
> public String exportedFileName;
> public String sessionKey;
> public long fileOffset = 0;
>
> }
>
> Once in a while, currentFile became *nullified, *this happens after I
> extract state via
>
> MultiStorePacketState so = state.value();
>
> The frequency of this behaviour is similar to checkpoint interval ( 
> checkpoint interval defined as 5 seconds and first occurence of this problem 
> is also 5 seconds), otherwise I don't have any clues to a possible 
> explanation.
>
> Is it an expected side effect of checkpoint procedure ?
>
> Best regards,
> Alex
>
>


Reset of transient variables in state to default values.

2021-10-11 Thread Alex Drobinsky
Dear flink community,

I have following state class ( irrelevant fields removed )
public class MultiStorePacketState implements Serializable {

public transient RandomAccessFile currentFile = null;
public long timerValue;
public String fileName;
public String exportedFileName;
public String sessionKey;
public long fileOffset = 0;

}

Once in a while, currentFile became *nullified, *this happens after I
extract state via

MultiStorePacketState so = state.value();

The frequency of this behaviour is similar to checkpoint interval (
checkpoint interval defined as 5 seconds and first occurence of this
problem is also 5 seconds), otherwise I don't have any clues to a
possible explanation.

Is it an expected side effect of checkpoint procedure ?

Best regards,
Alex


RE: Flink S3 Presto Checkpointing Permission Forbidden

2021-10-11 Thread Denis Nutiu
Hi Rommel,



Thanks for getting back to me and for your time.

I switched to the Hadoop plugin and used the following authentication
method that worked:
*fs.s3a.aws.credentials.provider:
"com.amazonaws.auth.WebIdentityTokenCredentialsProvider"*


Turns out that I was using the wrong credentials provider. Reading
AWSCredentialProvider[1] and seeing that I have the
AWS_WEB_IDENTITY_TOKEN_FILE variable in the container allowed me to find
the correct one.


[1]
https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html


Best,

Denis





*From:* Rommel Holmes 
*Sent:* Saturday, October 9, 2021 02:09
*To:* Denis Nutiu 
*Cc:* user 
*Subject:* Re: Flink S3 Presto Checkpointing Permission Forbidden



You already have s3 request ID, you can easily reach out to AWS tech
support to know what account was used to write to S3. I guess that account
probably doesn't have permission to do the following:



"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"

Then grant the account with the permission in k8s. Then you should be good
to go.









On Fri, Oct 8, 2021 at 6:06 AM Denis Nutiu  wrote:

Hello,



I'm trying to deploy my Flink cluster inside of an AWS EKS using Flink
Native. I want to use S3 as a filesystem for checkpointing, and giving the
following options related to flink-s3-fs-presto:



"-Dhive.s3.endpoint": "https://s3.eu-central-1.amazonaws.com;
"-Dhive.s3.iam-role": "arn:aws:iam::xxx:role/s3-flink"
"-Dhive.s3.use-instance-credentials": "true"
"-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS":
"flink-s3-fs-presto-1.13.2.jar"
"-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS":
"flink-s3-fs-presto-1.13.2.jar"
"-Dstate.backend": "rocksdb"
"-Dstate.backend.incremental": "true"
"-Dstate.checkpoints.dir": "s3://bucket/checkpoints/"
"-Dstate.savepoints.dir": "s3://bucket/savepoints/"



But my job fails with:



2021-10-08 11:38:49,771 WARN
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Could
not properly dispose the private states in the pending checkpoint 45 of job
75bdd6fb6e689961ef4e096684e867bc.
com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
JEZ3X8YPDZ2TF4T9; S3 Extended Request ID:
u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=;
Proxy: null), S3 Extended Request ID:
u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=
(Path: s3://bucket/checkpoints/75bdd6fb6e689961ef4e096684e867bc/chk-45)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
~[?:?]
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:450)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:427)
~[?:?]
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:160)
~[?:?]
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.disposeOnFailure(FsCheckpointStorageLocation.java:117)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.discard(PendingCheckpoint.java:588)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:60)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$2(CheckpointsCleaner.java:85)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden
(Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request
ID: JEZ3X8YPDZ2TF4T9; S3 Extended Request ID:
u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=;
Proxy: null)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
~[?:?]
at

Checkpoint size increasing even i enable increasemental checkpoint

2021-10-11 Thread Lei Wang
[image: image.png]

The  checkpointed data size became bigger and bigger and the node cpu is
very high when the job is doing checkpointing.
 But I have enabled incremental checkpointing:
env.setStateBackend(new RocksDBStateBackend(checkpointDir,
true));

I am using flink-1.11.2 and aliyun oss as checkpoint storage.


Any insight on this?

Thanks,

Lei


Re: OVER IGNORE NULLS support

2021-10-11 Thread Francesco Guardiani
Seems like IGNORE NULL is specified in SQL spec 2008 (paragraph 6.10), the
opposite is called RESPECT NULLS:

 ::=
RESPECT NULLS | IGNORE NULLS

Perhaps this is worth supporting, I've opened an issue for that FLINK-24499
. @Adrian are you
interested in contributing to this issue?



On Sat, Oct 9, 2021 at 4:32 AM Caizhi Weng  wrote:

> Hi!
>
> Currently all built-in aggregate functions ignore null input values, so I
> guess this is the reason why Flink didn't support this syntax.
>
> I'm sort of curious about this syntax. Does it come from the SQL standard?
> What's the opposite of IGNORE NULLS? Is there a NOT IGNORE NULLS and if the
> user specifies this an exception will be thrown when a null value is
> encountered?
>
> Adrian Bednarz  于2021年10月8日周五 下午9:22写道:
>
>> Hi,
>>
>> we've been trying to run a query similar to
>> SELECT id, type, LAG(id) IGNORE NULLS OVER (PARTITION BY type ORDER BY
>> ts) AS lastId
>>   FROM Events
>>
>> A query without IGNORE NULLS clause executes just fine. This syntax is
>> supported by Calcite and our clients expect it to work. Our platform uses
>> FlinkSQL to execute certain types of queries and currently such syntax
>> causes jobs to fail with NPE. Here's a stack trace
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. null
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
>> at com.example.OverIgnoreNullsJob.main(OverIgnoreNullsJob.java:37)
>> Caused by: java.lang.NullPointerException
>> at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>> at org.apache.calcite.sql.SqlBasicCall.setOperator(SqlBasicCall.java:67)
>> at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:530)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>> at
>> org.apache.calcite.sql.type.SqlTypeUtil.deriveType(SqlTypeUtil.java:178)
>> at
>> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:71)
>> at
>> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:122)
>> at
>> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
>> at
>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>> at
>> org.apache.calcite.sql.SqlOverOperator.deriveType(SqlOverOperator.java:86)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>> at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:421)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>> at
>>