Re: Backpressure and 99th percentile latency

2020-03-05 Thread Arvid Heise
Hi Felipe, latency under backpressure has to be carefully interpreted. Latency's semantics actually require that the data source is read in a timely manner; that is, there is no bottleneck in your pipeline where data is piling up. Thus, to measure latency in experiments you must ensure that the

Re: Backpressure and 99th percentile latency

2020-03-05 Thread Zhijiang
Hi Felipe, Try to answer your below questions. > I understand that I am tracking latency every 10 seconds for each physical > instance operator. Is that right? Generally right. The latency marker is emitted from source and flow through all the intermediate operators until sink. This interval

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
Actually this use case lead me to start thinking about one question: If watermark is enabled, could we also support GROUP BY event_time instead of forcing user defining a window based on the event_time. GROUP BY a standalone event_time can also be treated as a special window, which has both

RE: Re: Re: Teradata as JDBC Connection

2020-03-05 Thread Norm Vilmer (Contractor)
Thanks Jack. I’ll try removing the inAppendMode() ☺ Regarding the Teradata dialect, crossing my fingers and hoping insert queries work. From: Jark Wu Sent: Thursday, March 5, 2020 8:41 PM To: Norm Vilmer (Contractor) Cc: Arvid Heise ; user@flink.apache.org Subject: EXTERNAL - Re: Re: Teradata

Re: Re: Teradata as JDBC Connection

2020-03-05 Thread Jark Wu
Hi Norm, Here is a documentation for JDBC connector, you can find the supported properties there: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector Regarding to your exception, you don't need to call `inAppendMode`. JDBC sink support both append-mode

Re: JobMaster does not register with ResourceManager in high availability setup

2020-03-05 Thread Xintong Song
Hi Abhinav, Thanks for the log. However, the attached log seems to be incomplete. The NoResourceAvailableException cannot be found in this log. Regarding connecting to ResourceManager, the log suggests that: - ZK was back to life and connected at 06:29:56. 2020-02-27 06:29:56.539

Re: Writing retract streams to Kafka

2020-03-05 Thread Jark Wu
Hi Gyula, Does tumbling 5 seconds for aggregation meet your need? For example: INSERT INTO QueryResult SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5' SECOND), sum(t.quantity) AS quantity FROM ItemTransactions AS t, Queries AS q WHERE t.itemId = q.itemId AND

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jeff Zhang
There's 2 kinds of configuration: job level & cluster level. I am afraid we don't have document to differentiate that, it depends on how user understand these configuration. We may need to improve document on that. Kurt Young 于2020年3月6日周五 上午8:34写道: > If you already have a running flink cluster

Re: Single stream, two sinks

2020-03-05 Thread Austin Cawley-Edwards
We have the same setup and it works quite well. One thing to take into account is that your HTTP call may happen multiple times if you’re using checkpointing/ fault tolerance mechanism, so it’s important that those calls are idempotent and won’t duplicate data. Also we’ve found that it’s

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Kurt Young
If you already have a running flink cluster and you want submit another job to this cluster, then all the configurations relates to process parameters like TM memory, slot number etc are not be able to modify. Best, Kurt On Thu, Mar 5, 2020 at 11:08 PM Gyula Fóra wrote: > Kurt can you please

Re: Single stream, two sinks

2020-03-05 Thread Gadi Katsovich
Guys, thanks for the great advice. It works! I used HttpAsyncClient from Apache Commons. At first I tried to implement the async http client by implementing AsyncFunction. I implemented the asyncInvoke method and used try-with-resouce to instantiate the client (because it's

Backpressure and 99th percentile latency

2020-03-05 Thread Felipe Gutierrez
Hi, I am a bit confused about the topic of tracking latency in Flink [1]. It says if I use the latency track I am measuring the Flink’s network stack but application code latencies also can influence it. For instance, if I am using the metrics.latency.granularity: operator (default) and

Re: Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread David Magalhães
Awesome Arvid, thanks a lot! :) And I thought when doing this that I was simplifying the test ... On Thu, Mar 5, 2020 at 8:27 PM Arvid Heise wrote: > Hi David, > > bounded sources do not work well with checkpointing. As soon as the source > is drained, no checkpoints are performed anymore.

Re: Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread Arvid Heise
Hi David, bounded sources do not work well with checkpointing. As soon as the source is drained, no checkpoints are performed anymore. It's an unfortunate limitation that we want to get rid of, but haven't found the time (because it requires larger changes). So for your test to work, you need to

Re: Very large _metadata file

2020-03-05 Thread Jacob Sevart
Thanks, I will monitor that thread. I'm having a hard time following the serialization code, but if you know anything about the layout, tell me if this makes sense. What I see in the hex editor is, first, many HDFS paths. Then gigabytes of unreadable data. Then finally another HDFS path at the

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-05 Thread Bowen Li
@JingsongLee 把当前的hive sink并发度配置策略加到文档里吧 https://issues.apache.org/jira/browse/FLINK-16448 On Tue, Mar 3, 2020 at 9:31 PM Jun Zhang <825875...@qq.com> wrote: > > 嗯嗯,其实我觉得我写的这个示例sql应该是一个使用很广泛的sql,我新建了hive表,并且导入了数据之后,一般都会使用类似的sql来验证一下表建的对不对,数据是否正确。 > > > > > > > 在2020年03月4日 13:25,JingsongLee

Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread David Magalhães
I've implemented a CustomSink with TwoPhaseCommit. To test this I've create a test using the baselines of this [1] one, and it works fine. To test the integration with S3 (and with an exponential back off), I've tried to implement a new test, using the following code: ... val invalidWriter =

Re: Very large _metadata file

2020-03-05 Thread Kostas Kloudas
Hi Jacob, As I said previously I am not 100% sure what can be causing this behavior, but this is a related thread here: https://lists.apache.org/thread.html/r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d%40%3Cuser.flink.apache.org%3E Which you can re-post your problem and monitor for

Re: StreamingFileSink Not Flushing All Data

2020-03-05 Thread Kostas Kloudas
Thanks Austin, If the CompressionWriterFactory works for you in 1.10, then you can copy it as is in 1.9 and use it. The BulkWriter interfaces have not changed between the versions (as far as I recall). But please keep in mind that there is a bug in the CompressWriterFactory with a pending PR that

Re: Flink Deployment failing with RestClientException

2020-03-05 Thread Andrey Zagrebin
Hi Samir, It may be a known issue [1][2] where some action during job submission takes too long time but eventually completes in job manager. Have you checked job manager logs whether there are any other failures, not “Ask timed out"? Have you checked Web UI whether all the jobs have been

How do I get the value of 99th latency inside an operator?

2020-03-05 Thread Felipe Gutierrez
Hi community, where from the Dlink code I can get the value of 99th percentile latency (flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{operator_id="93352199ce18d8917f20fdf82cedb1b4",quantile="0.99"})? Probably I will have to hack the Flink source code to

Flink Deployment failing with RestClientException

2020-03-05 Thread Samir Tusharbhai Chauhan
Hi, I am having issue where after deploying few jobs, it starts failing with below errors. I don't have such issue in other environments. What should I check first in such scenario? My environment is Azure Kubernetes 1.15.7 Flink 1.6.0 Zookeeper 3.4.10 The program finished with the following

RE: Re: Teradata as JDBC Connection

2020-03-05 Thread Norm Vilmer (Contractor)
Thanks for the reply, Arvid. I changed the property names in my ConnectorDescriptor subclass to match what the validator wanted and now get: “Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. Reason: No factory supports all

Re: Setting the operator-id to measure percentile latency over several jobs

2020-03-05 Thread Felipe Gutierrez
thanks! I was wondering why the operator name is not implemented for the latency metrics, because for the other metrics it is implemented. but thanks anyway! *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com

Re: checkpoint _metadata file has >20x different in size among different check-points

2020-03-05 Thread Congxian Qiu
Hi Maybe there contains some ByteStreamStateHandle in the checkpoint, if you want to verify this, maybe you can configure `state.backend.fs.memory-threshold` to verify it. Please be careful to set this config, because it may produce many files with small size. Best, Congxian Arvid Heise

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
Kurt can you please explain which conf parameters do you mean? In regular executions (Yarn for instance) we have dynamic config parameters overriding any flink-conf argument. So it is not about setting them in the user code but it should happen before the ClusterDescriptors are created (ie in

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
I see, maybe I just dont understand how to properly express what I am trying to compute. Basically I want to aggregate the quantities of the transactions that happened in the 5 seconds before the query. Every query.id belongs to a single query (event_time, itemid) but still I have to group :/

Re: java.time.LocalDateTime in POJO type

2020-03-05 Thread KristoffSC
Thanks, do you have any example how I could use it? Basically I have a POJO class that has LocalDateTime filed in it. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to use self defined json format when create table from kafka stream?

2020-03-05 Thread Kurt Young
User defined formats also sounds like an interesting extension. Best, Kurt On Thu, Mar 5, 2020 at 3:06 PM Jark Wu wrote: > Hi Lei, > > Currently, Flink SQL doesn't support to register a binlog format (i.e. > just define "order_id" and "order_no", but the json schema has other binlog >

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Kurt Young
IIRC the tricky thing here is not all the config options belong to flink-conf.yaml can be adjust dynamically in user's program. So it will end up like some of the configurations can be overridden but some are not. The experience is not quite good for users. Best, Kurt On Thu, Mar 5, 2020 at

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
I think the issue is not caused by event time interval join, but the aggregation after the join: GROUP BY t.itemId, q.event_time, q.queryId; In this case, there is still no chance for Flink to determine whether the groups like (itemId, eventtime, queryId) have complete data or not. As a

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
That's exactly the kind of behaviour I am looking for Kurt ("ignore all delete messages"). As for the data completion, in my above example it is basically an event time interval join. With watermarks defined Flink should be able to compute results once in exactly the same way as for the tumbling

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
Back to this case, I assume you are expecting something like "ignore all delete messages" flag? With this flag turned on, Flink will only send insert messages which corresponding current correct results to kafka and drop all retractions and deletes on the fly. Best, Kurt On Thu, Mar 5, 2020 at

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
> I also don't completely understand at this point why I can write the result of a group, tumble window aggregate to Kafka and not this window join / aggregate. If you are doing a tumble window aggregate with watermark enabled, Flink will only fire a final result for each window at once, no

Re: Rocksdb Serialization issue

2020-03-05 Thread David Morin
Yes Arvid, the Sink is keyed by a String dbName::tableName This is kafka as input but to init the state we have to read Hive delta files febore consume kafka records. This is ORC files we have to read to init the state with one directory per table. A key (primary key) is only in one bucket file.

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jeff Zhang
Hi Gyula, I am doing integration Flink with Zeppelin. One feature in Zeppelin is that user could override any features in flink-conf.yaml. (Actually any features here https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html). Of course you can run flink sql in Zeppelin, and

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Thanks Benoît! I can see now how I can implement this myself through the provided sink interfaces but I was trying to avoid having to write code for this :D My initial motivation was to see whether we are able to write out any kind of table to Kafka as a simple stream of "upserts". I also don't

Re: Writing retract streams to Kafka

2020-03-05 Thread Benoît Paris
Hi Gyula, I'm afraid conversion to see the retractions vs inserts can't be done in pure SQL (though I'd love that feature). You might want to go lower level and implement a RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink [3]. This will give you a

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Hi Roman, This is the core logic: CREATE TABLE QueryResult ( queryIdBIGINT, itemIdSTRING, quantity INT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'query.output.log.1', 'connector.properties.bootstrap.servers' = '', 'format.type'

Re: Writing retract streams to Kafka

2020-03-05 Thread Khachatryan Roman
Hi Gyula, Could you provide the code of your Flink program, the error with stacktrace and the Flink version? Thanks., Roman On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra wrote: > Hi All! > > Excuse my stupid question, I am pretty new to the Table/SQL API and I am > trying to play around with it

Re: Rocksdb Serialization issue

2020-03-05 Thread Arvid Heise
Hi David, could you please explain what you are actually trying to achieve? It seems like you are reading in the SinkFunction#open some files from S3 and put it into state (bootstrapping?) How many instances of the sink are executed? How do you shard the buckets / e.g. how do you avoid reading

Re: Rocksdb Serialization issue

2020-03-05 Thread David Morin
Hello Arvid, After some investigations with the help of my colleague we finally found the root cause. In order to improve the init of the state, I've created some threads to parallelize the read of bucket files. This is a temporary solution because I've planned to use the State Processor API.

Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Hi All! Excuse my stupid question, I am pretty new to the Table/SQL API and I am trying to play around with it implementing and running a few use-cases. I have a simple window join + aggregation, grouped on some id that I want to write to Kafka but I am hitting the following error:

Re: Setting the operator-id to measure percentile latency over several jobs

2020-03-05 Thread Khachatryan Roman
Hi Felipe, Please find the answers to your questions below. > Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it? Yes. > How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring? You are using

Re: How to test flink job recover from checkpoint

2020-03-05 Thread Bajaj, Abhinav
I implemented a custom function that throws up a runtime exception. You can extend from simpler MapFunction or more complicated RichParallelSourceFunction depending on your use case. You can add logic to throw a runtime exception on a certain condition in the map or run method. .

Setting the operator-id to measure percentile latency over several jobs

2020-03-05 Thread Felipe Gutierrez
Hi community, I am tracking the latency of operators in Flink according to this reference [1]. When I am using Prometheus+Grafana I can issue a query using "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency" and I can check the percentiles of each "operator_id"

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
I could basically list a few things I want to set (execution.target for example), but it's fair to assume that I would like to be able to set anything :) Gyula On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li wrote: > Hi Gyula, > > Maybe Blink planner has invoked

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jingsong Li
Hi Gyula, Maybe Blink planner has invoked "StreamExecutionEnvironment.configure", which planner do you use? But "StreamExecutionEnvironment.configure" is only for partial configuration, can not for all configuration in flink-conf.yaml. So what's the config do you want to set? I know some config

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jark Wu
Hi Gyula, Flink configurations can be overrided via `TableConfig#getConfiguration()`, however, SQL CLI only allows to set Table specific configs. I will think it as a bug/improvement of SQL CLI which should be fixed in 1.10.1. Best, Jark On Thu, 5 Mar 2020 at 18:12, Gyula Fóra wrote: > Thanks

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
Thanks Caizhi, This seems like a pretty big shortcoming for any multi-user/multi-app environment. I will open a jira for this. Gyula On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng wrote: > Hi Gyula. > > I'm afraid there is no way to override all Flink configurations currently. > SQL client yaml

Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-05 Thread Yang Wang
Hi Andrey, Thanks for driving this significant FLIP. From the user ML, we could also know there are many users running Flink in container environment. Then the docker image will be the very basic requirement. Just as you say, we should provide a unified place for all various usage(e.g. session,

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Caizhi Weng
Hi Gyula. I'm afraid there is no way to override all Flink configurations currently. SQL client yaml file can only override some of the Flink configurations. Configuration entries indeed can only set Table specific configs, while deployment entires are used to set the result fetching address and

How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
Hi All! I am trying to understand if there is any way to override flink configuration parameters when starting the SQL Client. It seems that the only way to pass any parameters is through the environment yaml. There I found 2 possible routes: configuration: this doesn't work as it only sets