DataStream> withTimestampsAndWatermarks1 = formatStream1
.assignTimestampsAndWatermarks(
new
BoundedOutOfOrdernessTimestampExtractor>(
Looks like you need to assign time stamps and emit watermarks to both the
streams viz. formatStream1 and formatStream2 as described at
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html
On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar
wrote:
> Hello
Hello Team,
I'm new to Flink and coming from Spark background. I need help in completing
this stream job. I'm reading data from two different Kafka topics and I want to
join them.
My code:
formatStream1.join(formatStream2)
.where(new KeySelector, String>() {
Hi Vishal,
AFAIK, the current behavior of kafka source will always use checkpoint
state as the start position, ignoring other configuration.
A workaround solution I can come up with is to set a new uuid to your kafka
source and restore your job with `allowNonRestoreState`.
Therefore, you can use
Hi Akshay,
Sorrry I have not thought of a proper way to handle single large record in
distributed task managers in flink. But I can give some hints for adjusting the
related memories for work around OOM issue.
Large fraction of memories in task manager are managed by flink for efficiency,
and
Yes, you are right. I add log to record the time of seek and find that
sometimes it is very slow. Then I use the rocksdb's files to test locally
and the same problem appears. It is very weird to find that rocksdb's seek
iterate data one by one. Until now, I add cache for rocksdb. The time is
Hi,
Thanks for your reply. I tried running a simple "group by" on just one
dataset where few keys are repeatedly occurring (in order of millions) and
did not include any joins. I wanted to see if this issue is specific to
join. But as I was expecting, I ran into the same issue. I am giving
Thanks Till,
I’ve raised a JIRA for this: https://issues.apache.org/jira/browse/FLINK-10988.
Let me know if theres anything else I can add to the JIRA to help
Regards,
Scott
SCOTT SUE
CHIEF TECHNOLOGY OFFICER
Support Line : +44(0) 2031 371 603
Mobile : +852 9611 3969
9/F, 33 Lockhart Road,
I can see the benefit for other users as well. One could include it as part
of some development/debugging tools, for example. It would not strictly
need to go into Flink but it would have the benefit of better/increased
visibility I guess. In that sense, opening a JIRA issue and posting on dev
Hi Kasif,
I think in this situation it is best if you defined your own custom
RestartStrategy by specifying a class which has a `RestartStrategyFactory
createFactory(Configuration configuration)` method as `restart-strategy:
MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.
Cheers,
Till
On
Hi Till,
Yeah I think that would work especially knowing this isn’ something that is out
of the box at the moment. Do you think its worth raising this as a feature
request at all? I think that’s one thing with my experience with Flink is that
its quite hard to debug what is going on when
Hi Scott,
I think you could write some Wrappers for the different user function types
which could contain the logging logic. That way you would still need to
wrap you actual business logic but don't have to duplicate the logic over
and over again.
If you also want to log the state, then you
Hi,
I think you need to a custom `RestClientFactory` which enables basic auth
on the ElasticSearch RestClient according to this documentation [1]. You
can set the RestClientFactory on the ElasticsearchSink.Builder.
[1]
Hi Akshay,
Flink currently does not support to automatically distribute hot keys
across different JVMs. What you can do is to adapt the parallelism/number
of partitions manually if you encounter that one partition contains a lot
of hot keys. This might mitigate the problem by partitioning the hot
Hi Bastien,
the OutputFormat specifies how a given record is written to an external
system. The DataSink using these formats do not support using broadcast
variables. This is currently a limitation of Flink.
What you could do is to introduce a mapper before your sink which enriches
the records
Hi Krishna,
I think the problem is that you are trying to pass in dynamic properties
(-Dconfig.file=dev.conf) to an already started the cluster. The Flink
cluster components or their JVMs need to know the env.java.opts at cluster
start up time and not when the Flink job is submitted. You can
Hello All,
I have a Flink application that inherits configuration from
application.conf in the resources folder.
Now, I want to run this application on a cluster. I begin creating an fat
jar "mvn clean assembly".
This jar file is executed by executing the following command below
Hello,
I would like to use a broadcast variable in my outputformat (to pass some
information, and control execution flow)
How would I do it ?
.output does not have a .withBroadcast function as it does not extends
SingleInputUdfOperator
--
Bastien DINE
Data Architect / Software
Btw how did you make sure that it is stuck in the seek call and that the trace
does not show different invocations of seek? This can indicate that seek is
slow, but is not yet proof that you are stuck.
> On 22. Nov 2018, at 13:01, liujiangang wrote:
>
> This is not my case. Thank you.
>
>
>
This is not my case. Thank you.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
HI Till,
sorry for the late reply, I was on holidays and couldn't follow up the
issue.
1. Flink 1.6.1, Kafka 1.1.0
2. The topic has 64 partitions. We don't have so many slots available but
we could try this.
3. Yes, they are running in different nodes
4. I meant that until the operator that is
Thanks Flavio.
This looks very useful.
AFAIK, the Calcite community is also working on functions for JSON <-> Text
conversions which are part of SQL:2016.
Hopefully, we can leverage their implementations in Flink's SQL support.
Best, Fabian
Am Di., 20. Nov. 2018 um 18:27 Uhr schrieb Flavio
Hi Akshay,
You encountered an existing issue for serializing large records to cause OOM.
Every subpartition would create a separate serializer before, and each
serializer would maintain an internal bytes array for storing intermediate
serialization results. The key point is that these overhead
Hi,
Which TableSource and TableSink do you use?
Best, Fabian
Am Mo., 19. Nov. 2018 um 15:39 Uhr schrieb miki haiat :
> can you share your entire code please
>
> On Mon, Nov 19, 2018 at 4:03 PM 徐涛 wrote:
>
>> Hi Experts,
>> I use the following sql, and sink to mysql,
>> select
>>
>>
Hi,
are your RocksDB instances running on local SSDs or on something like EBS? If
have previously seen cases where this happened because some EBS quota was
exhausted and the performance got throttled.
Best,
Stefan
> On 22. Nov 2018, at 09:51, liujiangang wrote:
>
> Thank you very much. I
Thank you very much. I have something to say. Each data is 20KB. The
parallelism is 500 and each taskmanager memory is 10G. The memory is enough,
and I think the parallelism is big enough. Only the intervalJoin thread is
beyond 100% because of rockdb's seek. I am confused that why rockdb's seek
after i install x-pack in my elasticsearch cluster and the elasticsearch cluster with basicauth
the elasticsearch sink can't connect to elastic clustercode like:DataStream> esSink27 = tableEnv13.toRetractStream(esTable26, Row.class).filter( tuple -> tuple.f0);//generate
27 matches
Mail list logo