Hi Felipe,
latency under backpressure has to be carefully interpreted. Latency's
semantics actually require that the data source is read in a timely manner;
that is, there is no bottleneck in your pipeline where data is piling up.
Thus, to measure latency in experiments you must ensure that the
Hi Felipe,
Try to answer your below questions.
> I understand that I am tracking latency every 10 seconds for each physical
> instance operator. Is that right?
Generally right. The latency marker is emitted from source and flow through all
the intermediate operators until sink. This interval
Actually this use case lead me to start thinking about one question:
If watermark is enabled, could we also support GROUP BY event_time instead
of forcing
user defining a window based on the event_time.
GROUP BY a standalone event_time can also be treated as a special window,
which has
both
Thanks Jack. I’ll try removing the inAppendMode() ☺
Regarding the Teradata dialect, crossing my fingers and hoping insert queries
work.
From: Jark Wu
Sent: Thursday, March 5, 2020 8:41 PM
To: Norm Vilmer (Contractor)
Cc: Arvid Heise ; user@flink.apache.org
Subject: EXTERNAL - Re: Re: Teradata
Hi Norm,
Here is a documentation for JDBC connector, you can find the supported
properties there:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
Regarding to your exception, you don't need to call `inAppendMode`. JDBC
sink support both append-mode
Hi Abhinav,
Thanks for the log. However, the attached log seems to be incomplete.
The NoResourceAvailableException cannot be found in this log.
Regarding connecting to ResourceManager, the log suggests that:
- ZK was back to life and connected at 06:29:56.
2020-02-27 06:29:56.539
Hi Gyula,
Does tumbling 5 seconds for aggregation meet your need? For example:
INSERT INTO QueryResult
SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
SECOND), sum(t.quantity) AS quantity
FROM
ItemTransactions AS t,
Queries AS q
WHERE
t.itemId = q.itemId AND
There's 2 kinds of configuration: job level & cluster level. I am afraid we
don't have document to differentiate that, it depends on how user
understand these configuration. We may need to improve document on that.
Kurt Young 于2020年3月6日周五 上午8:34写道:
> If you already have a running flink cluster
We have the same setup and it works quite well. One thing to take into
account is that your HTTP call may happen multiple times if you’re using
checkpointing/ fault tolerance mechanism, so it’s important that those
calls are idempotent and won’t duplicate data.
Also we’ve found that it’s
If you already have a running flink cluster and you want submit another job
to this cluster, then all the configurations
relates to process parameters like TM memory, slot number etc are not be
able to modify.
Best,
Kurt
On Thu, Mar 5, 2020 at 11:08 PM Gyula Fóra wrote:
> Kurt can you please
Guys, thanks for the great advice. It works!
I used HttpAsyncClient from Apache Commons.
At first I tried to implement the async http client by implementing
AsyncFunction. I implemented the asyncInvoke method and used
try-with-resouce to instantiate the client (because it's
Hi,
I am a bit confused about the topic of tracking latency in Flink [1]. It
says if I use the latency track I am measuring the Flink’s network stack
but application code latencies also can influence it. For instance, if I am
using the metrics.latency.granularity: operator (default) and
Awesome Arvid, thanks a lot! :)
And I thought when doing this that I was simplifying the test ...
On Thu, Mar 5, 2020 at 8:27 PM Arvid Heise wrote:
> Hi David,
>
> bounded sources do not work well with checkpointing. As soon as the source
> is drained, no checkpoints are performed anymore.
Hi David,
bounded sources do not work well with checkpointing. As soon as the source
is drained, no checkpoints are performed anymore. It's an unfortunate
limitation that we want to get rid of, but haven't found the time (because
it requires larger changes).
So for your test to work, you need to
Thanks, I will monitor that thread.
I'm having a hard time following the serialization code, but if you know
anything about the layout, tell me if this makes sense. What I see in the
hex editor is, first, many HDFS paths. Then gigabytes of unreadable data.
Then finally another HDFS path at the
@JingsongLee 把当前的hive sink并发度配置策略加到文档里吧
https://issues.apache.org/jira/browse/FLINK-16448
On Tue, Mar 3, 2020 at 9:31 PM Jun Zhang <825875...@qq.com> wrote:
>
> 嗯嗯,其实我觉得我写的这个示例sql应该是一个使用很广泛的sql,我新建了hive表,并且导入了数据之后,一般都会使用类似的sql来验证一下表建的对不对,数据是否正确。
>
>
>
>
>
>
> 在2020年03月4日 13:25,JingsongLee
I've implemented a CustomSink with TwoPhaseCommit. To test this I've create
a test using the baselines of this [1] one, and it works fine.
To test the integration with S3 (and with an exponential back off), I've
tried to implement a new test, using the following code:
...
val invalidWriter =
Hi Jacob,
As I said previously I am not 100% sure what can be causing this
behavior, but this is a related thread here:
https://lists.apache.org/thread.html/r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d%40%3Cuser.flink.apache.org%3E
Which you can re-post your problem and monitor for
Thanks Austin,
If the CompressionWriterFactory works for you in 1.10, then you can copy it
as is in 1.9 and use it. The BulkWriter interfaces have not changed between
the versions (as far as I recall). But please keep in mind that there is a
bug in the CompressWriterFactory with a pending PR that
Hi Samir,
It may be a known issue [1][2] where some action during job submission takes
too long time but eventually completes in job manager.
Have you checked job manager logs whether there are any other failures, not
“Ask timed out"?
Have you checked Web UI whether all the jobs have been
Hi community,
where from the Dlink code I can get the value of 99th percentile latency
(flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{operator_id="93352199ce18d8917f20fdf82cedb1b4",quantile="0.99"})?
Probably I will have to hack the Flink source code to
Hi,
I am having issue where after deploying few jobs, it starts failing with below
errors. I don't have such issue in other environments. What should I check
first in such scenario?
My environment is
Azure Kubernetes 1.15.7
Flink 1.6.0
Zookeeper 3.4.10
The program finished with the following
Thanks for the reply, Arvid. I changed the property names in my
ConnectorDescriptor subclass to match what the validator wanted and now get:
“Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.
Reason: No factory supports all
thanks! I was wondering why the operator name is not implemented for the
latency metrics, because for the other metrics it is implemented.
but thanks anyway!
*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
Hi
Maybe there contains some ByteStreamStateHandle in the checkpoint, if you
want to verify this, maybe you can configure
`state.backend.fs.memory-threshold` to verify it. Please be careful to set
this config, because it may produce many files with small size.
Best,
Congxian
Arvid Heise
Kurt can you please explain which conf parameters do you mean?
In regular executions (Yarn for instance) we have dynamic config
parameters overriding any flink-conf argument.
So it is not about setting them in the user code but it should happen
before the ClusterDescriptors are created (ie in
I see, maybe I just dont understand how to properly express what I am
trying to compute.
Basically I want to aggregate the quantities of the transactions that
happened in the 5 seconds before the query.
Every query.id belongs to a single query (event_time, itemid) but still I
have to group :/
Thanks,
do you have any example how I could use it?
Basically I have a POJO class that has LocalDateTime filed in it.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
User defined formats also sounds like an interesting extension.
Best,
Kurt
On Thu, Mar 5, 2020 at 3:06 PM Jark Wu wrote:
> Hi Lei,
>
> Currently, Flink SQL doesn't support to register a binlog format (i.e.
> just define "order_id" and "order_no", but the json schema has other binlog
>
IIRC the tricky thing here is not all the config options belong to
flink-conf.yaml can be adjust dynamically in user's program.
So it will end up like some of the configurations can be overridden but
some are not. The experience is not quite good for users.
Best,
Kurt
On Thu, Mar 5, 2020 at
I think the issue is not caused by event time interval join, but the
aggregation after the join:
GROUP BY t.itemId, q.event_time, q.queryId;
In this case, there is still no chance for Flink to determine whether the
groups like (itemId, eventtime, queryId) have complete data or not.
As a
That's exactly the kind of behaviour I am looking for Kurt ("ignore all
delete messages").
As for the data completion, in my above example it is basically an event
time interval join.
With watermarks defined Flink should be able to compute results once in
exactly the same way as for the tumbling
Back to this case, I assume you are expecting something like "ignore all
delete messages" flag? With this
flag turned on, Flink will only send insert messages which corresponding
current correct results to kafka and
drop all retractions and deletes on the fly.
Best,
Kurt
On Thu, Mar 5, 2020 at
> I also don't completely understand at this point why I can write the
result of a group, tumble window aggregate to Kafka and not this window
join / aggregate.
If you are doing a tumble window aggregate with watermark enabled, Flink
will only fire a final result for
each window at once, no
Yes Arvid, the Sink is keyed by a String dbName::tableName
This is kafka as input but to init the state we have to read Hive delta
files febore consume kafka records. This is ORC files we have to read to
init the state with one directory per table.
A key (primary key) is only in one bucket file.
Hi Gyula,
I am doing integration Flink with Zeppelin. One feature in Zeppelin is that
user could override any features in flink-conf.yaml. (Actually any features
here
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
Of course you can run flink sql in Zeppelin, and
Thanks Benoît!
I can see now how I can implement this myself through the provided sink
interfaces but I was trying to avoid having to write code for this :D
My initial motivation was to see whether we are able to write out any kind
of table to Kafka as a simple stream of "upserts".
I also don't
Hi Gyula,
I'm afraid conversion to see the retractions vs inserts can't be done in
pure SQL (though I'd love that feature).
You might want to go lower level and implement a RetractStreamTableSink
[1][2] that you would wrap around a KafkaTableSink [3]. This will give you
a
Hi Roman,
This is the core logic:
CREATE TABLE QueryResult (
queryIdBIGINT,
itemIdSTRING,
quantity INT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'query.output.log.1',
'connector.properties.bootstrap.servers' = '',
'format.type'
Hi Gyula,
Could you provide the code of your Flink program, the error with stacktrace
and the Flink version?
Thanks.,
Roman
On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra wrote:
> Hi All!
>
> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
> trying to play around with it
Hi David,
could you please explain what you are actually trying to achieve?
It seems like you are reading in the SinkFunction#open some files from S3
and put it into state (bootstrapping?)
How many instances of the sink are executed?
How do you shard the buckets / e.g. how do you avoid reading
Hello Arvid,
After some investigations with the help of my colleague we finally found
the root cause.
In order to improve the init of the state, I've created some threads to
parallelize the read of bucket files.
This is a temporary solution because I've planned to use the State
Processor API.
Hi All!
Excuse my stupid question, I am pretty new to the Table/SQL API and I am
trying to play around with it implementing and running a few use-cases.
I have a simple window join + aggregation, grouped on some id that I want
to write to Kafka but I am hitting the following error:
Hi Felipe,
Please find the answers to your questions below.
> Each "operator_subtask_index" means each instance of the parallel
physical operator, doesn't it?
Yes.
> How can I set a fixed ID for the "operator_id" in my code so I can
identify quickly which operator I am measuring?
You are using
I implemented a custom function that throws up a runtime exception.
You can extend from simpler MapFunction or more complicated
RichParallelSourceFunction depending on your use case.
You can add logic to throw a runtime exception on a certain condition in the
map or run method. .
Hi community,
I am tracking the latency of operators in Flink according to this reference
[1]. When I am using Prometheus+Grafana I can issue a query using
"flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency"
and I can check the percentiles of each "operator_id"
I could basically list a few things I want to set (execution.target for
example), but it's fair to assume that I would like to be able to set
anything :)
Gyula
On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li wrote:
> Hi Gyula,
>
> Maybe Blink planner has invoked
Hi Gyula,
Maybe Blink planner has invoked "StreamExecutionEnvironment.configure",
which planner do you use?
But "StreamExecutionEnvironment.configure" is only for partial
configuration, can not for all configuration in flink-conf.yaml.
So what's the config do you want to set? I know some config
Hi Gyula,
Flink configurations can be overrided via `TableConfig#getConfiguration()`,
however, SQL CLI only allows to set Table specific configs.
I will think it as a bug/improvement of SQL CLI which should be fixed in
1.10.1.
Best,
Jark
On Thu, 5 Mar 2020 at 18:12, Gyula Fóra wrote:
> Thanks
Thanks Caizhi,
This seems like a pretty big shortcoming for any multi-user/multi-app
environment. I will open a jira for this.
Gyula
On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng wrote:
> Hi Gyula.
>
> I'm afraid there is no way to override all Flink configurations currently.
> SQL client yaml
Hi Andrey,
Thanks for driving this significant FLIP. From the user ML, we could also
know there are
many users running Flink in container environment. Then the docker image
will be the
very basic requirement. Just as you say, we should provide a unified place
for all various
usage(e.g. session,
Hi Gyula.
I'm afraid there is no way to override all Flink configurations currently.
SQL client yaml file can only override some of the Flink configurations.
Configuration entries indeed can only set Table specific configs, while
deployment entires are used to set the result fetching address and
Hi All!
I am trying to understand if there is any way to override flink
configuration parameters when starting the SQL Client.
It seems that the only way to pass any parameters is through the
environment yaml.
There I found 2 possible routes:
configuration: this doesn't work as it only sets
53 matches
Mail list logo