Re: Flink 1.11 Sql client environment yaml

2020-07-18 Thread godfrey he
hi

GenericInMemoryCatalog does not support settings now,
or you can refer to [1] for supported catalog details
and you can refer to [2] to supported types details.

"Kafka schema registry for schema" is under discussion [3],
which can be ready in 1.12.

sql client supports DDL to create a table with json format [4],
you can use ROW type to define nested json.
for example:

create table my_table (
  f varchar,
  nest_column row<
a varchar,
b int,
c int
  >
) with (
...
)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html#catalogs
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html
[3] https://issues.apache.org/jira/browse/FLINK-16048
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#how-to-create-a-table-with-json-format

Best,
Godfrey


Lian Jiang  于2020年7月18日周六 上午6:28写道:

> Hi,
>
> I am experimenting Flink SQL by following
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html.
> I want to set up an environment yaml to query Kafka data (json in avro
> format). Where can I find the information below?
>
> 1. use GenericInMemoryCatalog (e.g. type, settings)
> 2. use Kafka schema registry for schema. The example hard code the schema
> in env yaml.
> 3. other than UDF, is there a way to easily query a deeply nested json in
> Flink SQL?
>
> Appreciate your help!
>
> Regards
> Lian
>
>
>
>
>


Re: FlinkSQL 任务提交后 任务名称问题

2020-07-18 Thread godfrey he
hi Evan,
感谢反馈,目前已经有一个issue [1]在跟踪该问题,可以关注后续进展

[1] https://issues.apache.org/jira/browse/FLINK-18545


Best,
Godfrey

Jeff Zhang  于2020年7月18日周六 下午9:52写道:

> 在zeppelin中你可以指定insert 语句的job name,如下图,(对Zeppelin感兴趣的,可以加入钉钉群:32803524)
>
> %flink.ssql(jobName="my job")
>
> insert into sink_kafka select status, direction, cast(event_ts/10
> as timestamp(3)) from source_kafka where status <> 'foo'
>
> [image: image.png]
>
> Evan  于2020年7月18日周六 下午5:47写道:
>
>> 代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into
>> esSinkTable select ... from kafkaSourceTable")执行
>> 任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”
>>
>>
>> 这样很不友好啊,能不能我自己指定任务名称呢?
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: How do I trigger clear custom state in ProcessWindowsFunction

2020-07-18 Thread David Anderson
ProcessWindowFunction#process is passed a Context object that contains

  public abstract KeyedStateStore windowState();
  public abstract KeyedStateStore globalState();

which are available for you to use for custom state. Whatever you store in
windowState is scoped to a window, and is cleared when that window is
cleared (this is only useful for windows that may have multiple firings).
On the other hand, state that you create in globalState is retained
indefinitely. If you have an unbounded key space with keys that become
stale (such as sessionIds), you will want to use state TTL [1] on the state
descriptor(s) to arrange for its eventual deletion.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
David

On Sat, Jul 18, 2020 at 2:30 PM ゞ野蠻遊戲χ  wrote:

> Dear all:
>
> How do I clear custom state in ProcessWindowsFunction? Because there is no
> onTimer method in ProcessAllWindowFunction.
>
> Thanks
> Jiazhi
>


Re: FlinkSQL 任务提交后 任务名称问题

2020-07-18 Thread Jeff Zhang
在zeppelin中你可以指定insert 语句的job name,如下图,(对Zeppelin感兴趣的,可以加入钉钉群:32803524)

%flink.ssql(jobName="my job")

insert into sink_kafka select status, direction, cast(event_ts/10
as timestamp(3)) from source_kafka where status <> 'foo'

[image: image.png]

Evan  于2020年7月18日周六 下午5:47写道:

> 代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into
> esSinkTable select ... from kafkaSourceTable")执行
> 任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”
>
>
> 这样很不友好啊,能不能我自己指定任务名称呢?



-- 
Best Regards

Jeff Zhang


??????ProcessAllWindowFunction??????????????????ValueState????

2020-07-18 Thread ?g???U?[????
??

ProcessAllWindowFunction??ValueStateKeydProcessFounctiononTimerProcessAllWindowFunction??


??

How do I trigger clear custom state in ProcessWindowsFunction

2020-07-18 Thread ?g???U?[????
Dear all:


How do I clear custom state in ProcessWindowsFunction? Because there is no 
onTimer method in ProcessAllWindowFunction.


Thanks
Jiazhi

Re: Flink Sinks

2020-07-18 Thread David Anderson
Prasanna,

The Flink project does not have an SQS connector, and a quick google search
hasn't found one. Nor does Flink have an HTTP sink, but with a bit of
googling you can find that various folks have implemented this themselves.

As for implementing SQS as a custom sink, if you need exactly once
guarantees I believe you will need to limit yourself to using SQS FIFO
queues, and you'll need to attach a MessageDeduplicationId to each record.
SQS offers to do deduplication for you, but only supports deduplication
within a 5 minute window -- which I would find too short to rely on during
recovery or redeployment.

See https://stackoverflow.com/a/28111986/2000823 for more on exactly once
with SQS.

Regards,
David

On Fri, Jul 17, 2020 at 9:55 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi ,
>
> I did not find out of box flink sink connector for http and SQS
> mechanism.
>
> Has anyone implemented it?
> Wanted to know if we are writing a custom sink function  , whether  it
> would affect semantic exactly one guarantees ?
>
>
> Thanks ,
> Prasanna
>


Re: Flink FsStatebackend is giving better performance than RocksDB

2020-07-18 Thread David Anderson
You should be able to tune your setup to avoid the OOM problem you have run
into with RocksDB. It will grow to use all of the memory available to it,
but shouldn't leak. Perhaps something is misconfigured.

As for performance, with the FSStateBackend you can expect:

* much better throughput and average latency
* possibly worse worst-case latency, due to GC pauses

Large, multi-slot TMs can be more of a problem with the FSStateBackend
because of the increased scope for GC. You may want to run with more TMs,
each with fewer slots.

You'll also be giving up the possibility of taking advantage of quick,
local recovery [1] that comes for free when using incremental checkpoints
with RocksDB. Local recovery can still be used with the FSStateBackend, but
it comes at more of a cost.

As for migrating your state, you may be able to use the State Processor API
[2] to rewrite a savepoint taken from RocksDB so that it can be loaded by
the FSStateBackend, so long as you aren't using any windows or
ListCheckpointed state.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html


Best,
David

On Fri, Jul 17, 2020 at 1:53 PM Vijay Bhaskar 
wrote:

> Hi
> While doing scale testing we observed that FSStatebackend is out
> performing RocksDB.
> When using RocksDB, off heap  memory keeps growing over a period of time
> and after a day pod got terminated with OOM.
>
> Whereas the same data pattern FSStatebackend is running for days without
> any memory spike and OOM.
>
> Based on documentation this is what i understood:
>
> When the state size is so huge that we can't keep it in memory, that case
> RocksDB is preferred. That means indirectly when FSStatebackend is
> performing poorly when state size grows, RocksDB is preferred, right?
>
> Another question, We have run production using RocksDB for quite
> some time, if we switch to FSStatebackend, then what are the consequences?
> Following is what i can think of:
> Very first time i'll lose the state
> Thereafter w.r.t Save Points and Checkpoints the behavior is same ( I know
> there is no incremental checkpoint, but its a performance purpose)
>
> Other than that, do I see any issues?
>
> Regards
> Bhaskar
>
>


Re: Backpressure on Window step

2020-07-18 Thread David Anderson
Steve,

Your approach to debugging this sounds reasonable, but keep in mind that
the backpressure detection built into the WebUI is not infallible. You
could have backpressure that it doesn't detect.

FWIW, keyBy isn't an operator -- it's a declaration of how the operators
before and after the keyBy are connected.

Have you tried increasing the parallelism of the task(s) before the window
(where the parallelism is currently 4)? Given that your job is already
using 32 slots, you have little to lose by doing so. Perhaps the keyBy (and
associated change in parallelism) is the first point in your job where the
events are being serialized and sent over the network, and maybe 4
instances aren't enough to consistently provide the required throughput.

David

On Fri, Jul 17, 2020 at 11:17 PM Nelson Steven 
wrote:

> First off, thanks for your reply!
>
>
>
> I have an assumption that I should probably verify first:
>
> When determining the source of the backpressure we look (in the WebUI) for
> the first operator in our pipeline that is not showing backpressure. That’s
> what we consider to be the source of the backpressure
>
>
>
> In this case the first operator that in our graph that is not showing
> backpressure is our window operator (all though the keyBy operation right
> before it doesn’t show up in the graph). The window function uses a custom
> aggregation function that builds up a hashmap and a custom process function
> that emits the hashmap and performs some metrics operations. I am not sure
> how this would generate backpressure since it doesn’t perform any IO, but
> again I might be drawing incorrect conclusions.
>
>
>
> The window function has a parallelism of 32. Each of the Subtasks has
> between 136kb and 2.45mb of state, with a checkpoint duration of 280ms to 2
> seconds. Each of the 32 subtasks appear to be handling 1,700-50,000 records
> an hour with a bytes received of 7mb and 170mb
>
>
>
> Am I barking up the wrong tree?
>
>
>
> -Steve
>
>
>
>
>
> *From:* David Anderson 
> *Sent:* Friday, July 17, 2020 6:54 AM
> *To:* Nelson Steven 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Backpressure on Window step
>
>
>
> Backpressure is typically caused by something like one of these things:
>
>
>
> * problems relating to i/o to external services (e.g., enrichment via an
> API or database lookup, or a sink)
>
> * data skew (e.g., a hot key)
>
> * under-provisioning, or competition for resources
>
> * spikes in traffic
>
> * timer storms
>
>
>
> I would start to debug this by looking for signs of significant
> asymmetry in the metrics (across the various subtasks), or resource
> exhaustion. Could be related to the network, GC, CPU, disk i/o, etc.
> Flink's webUI will show you checkpoint size and timing information for each
> sub-task; you can learn a lot from studying that data.
>
>
>
> Relating to session windows -- could you occasionally have an unusually
> long session, and might that cause problems?
>
>
>
> Best,
> David
>
>
>
> On Tue, Jul 14, 2020 at 10:12 PM Nelson Steven 
> wrote:
>
> Hello!
>
>
>
> We are experiencing occasional backpressure on a Window function in our
> pipeline. The window is on a KeyedStream and is triggered by an
> EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a
> fanout and we use the window to sort things into batches based on the Key
> for the keyed stream. We aren’t seeing an unreasonable amount of records
> (500-600/s) on a parallism of 32 (prior step has a parallelism of 4). We
> are as interested in learning out to debug the issue as we are in fixing
> the actual problem. Any ideas?
>
>
>
> -Steve
>
>


FlinkSQL 任务提交后 任务名称问题

2020-07-18 Thread Evan
代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into 
esSinkTable select ... from kafkaSourceTable")执行
任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”


这样很不友好啊,能不能我自己指定任务名称呢?