Re: error while joining two datastream

2018-11-22 Thread Abhijeet Kumar
DataStream> withTimestampsAndWatermarks1 = formatStream1 .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor>(

Re: error while joining two datastream

2018-11-22 Thread Nagarjun Guraja
Looks like you need to assign time stamps and emit watermarks to both the streams viz. formatStream1 and formatStream2 as described at https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar wrote: > Hello

error while joining two datastream

2018-11-22 Thread Abhijeet Kumar
Hello Team, I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them. My code: formatStream1.join(formatStream2) .where(new KeySelector, String>() {

Re: Reset kafka offets to latest on restart

2018-11-22 Thread Tony Wei
Hi Vishal, AFAIK, the current behavior of kafka source will always use checkpoint state as the start position, ignoring other configuration. A workaround solution I can come up with is to set a new uuid to your kafka source and restore your job with `allowNonRestoreState`. Therefore, you can use

回复:OutOfMemoryError while doing join operation in flink

2018-11-22 Thread zhijiang
Hi Akshay, Sorrry I have not thought of a proper way to handle single large record in distributed task managers in flink. But I can give some hints for adjusting the related memories for work around OOM issue. Large fraction of memories in task manager are managed by flink for efficiency, and

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread liujiangang
Yes, you are right. I add log to record the time of seek and find that sometimes it is very slow. Then I use the rocksdb's files to test locally and the same problem appears. It is very weird to find that rocksdb's seek iterate data one by one. Until now, I add cache for rocksdb. The time is

Re: OutOfMemoryError while doing join operation in flink

2018-11-22 Thread Akshay Mendole
Hi, Thanks for your reply. I tried running a simple "group by" on just one dataset where few keys are repeatedly occurring (in order of millions) and did not include any joins. I wanted to see if this issue is specific to join. But as I was expecting, I ran into the same issue. I am giving

Re: Logging Kafka during exceptions

2018-11-22 Thread Scott Sue
Thanks Till, I’ve raised a JIRA for this: https://issues.apache.org/jira/browse/FLINK-10988. Let me know if theres anything else I can add to the JIRA to help Regards, Scott SCOTT SUE CHIEF TECHNOLOGY OFFICER Support Line : +44(0) 2031 371 603 Mobile : +852 9611 3969 9/F, 33 Lockhart Road,

Re: Logging Kafka during exceptions

2018-11-22 Thread Till Rohrmann
I can see the benefit for other users as well. One could include it as part of some development/debugging tools, for example. It would not strictly need to go into Flink but it would have the benefit of better/increased visibility I guess. In that sense, opening a JIRA issue and posting on dev

Re: Flink restart strategy on specific exception

2018-11-22 Thread Till Rohrmann
Hi Kasif, I think in this situation it is best if you defined your own custom RestartStrategy by specifying a class which has a `RestartStrategyFactory createFactory(Configuration configuration)` method as `restart-strategy: MyRestartStrategyFactoryFactory` in `flink-conf.yaml`. Cheers, Till On

Re: Logging Kafka during exceptions

2018-11-22 Thread Scott Sue
Hi Till, Yeah I think that would work especially knowing this isn’ something that is out of the box at the moment. Do you think its worth raising this as a feature request at all? I think that’s one thing with my experience with Flink is that its quite hard to debug what is going on when

Re: Logging Kafka during exceptions

2018-11-22 Thread Till Rohrmann
Hi Scott, I think you could write some Wrappers for the different user function types which could contain the logging logic. That way you would still need to wrap you actual business logic but don't have to duplicate the logic over and over again. If you also want to log the state, then you

Re: elasticsearch sink can't connect to elastic cluster with BasicAuth

2018-11-22 Thread Till Rohrmann
Hi, I think you need to a custom `RestClientFactory` which enables basic auth on the ElasticSearch RestClient according to this documentation [1]. You can set the RestClientFactory on the ElasticsearchSink.Builder. [1]

Re: OutOfMemoryError while doing join operation in flink

2018-11-22 Thread Till Rohrmann
Hi Akshay, Flink currently does not support to automatically distribute hot keys across different JVMs. What you can do is to adapt the parallelism/number of partitions manually if you encounter that one partition contains a lot of hot keys. This might mitigate the problem by partitioning the hot

Re: DataSet - Broadcast set in output format

2018-11-22 Thread Till Rohrmann
Hi Bastien, the OutputFormat specifies how a given record is written to an external system. The DataSink using these formats do not support using broadcast variables. This is currently a limitation of Flink. What you could do is to introduce a mapper before your sink which enriches the records

Re: Passing application configuring to Flink uber jar

2018-11-22 Thread Till Rohrmann
Hi Krishna, I think the problem is that you are trying to pass in dynamic properties (-Dconfig.file=dev.conf) to an already started the cluster. The Flink cluster components or their JVMs need to know the env.java.opts at cluster start up time and not when the Flink job is submitted. You can

Passing application configuring to Flink uber jar

2018-11-22 Thread Krishna Kalyan
Hello All, I have a Flink application that inherits configuration from application.conf in the resources folder. Now, I want to run this application on a cluster. I begin creating an fat jar "mvn clean assembly". This jar file is executed by executing the following command below

DataSet - Broadcast set in output format

2018-11-22 Thread bastien dine
Hello, I would like to use a broadcast variable in my outputformat (to pass some information, and control execution flow) How would I do it ? .output does not have a .withBroadcast function as it does not extends SingleInputUdfOperator -- Bastien DINE Data Architect / Software

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread Stefan Richter
Btw how did you make sure that it is stuck in the seek call and that the trace does not show different invocations of seek? This can indicate that seek is slow, but is not yet proof that you are stuck. > On 22. Nov 2018, at 13:01, liujiangang wrote: > > This is not my case. Thank you. > > >

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread liujiangang
This is not my case. Thank you. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Unbalanced Kafka consumer consumption

2018-11-22 Thread Gerard Garcia
HI Till, sorry for the late reply, I was on holidays and couldn't follow up the issue. 1. Flink 1.6.1, Kafka 1.1.0 2. The topic has 64 partitions. We don't have so many slots available but we could try this. 3. Yes, they are running in different nodes 4. I meant that until the operator that is

Re: Flink JSON (string) to Pojo (and vice versa) example

2018-11-22 Thread Fabian Hueske
Thanks Flavio. This looks very useful. AFAIK, the Calcite community is also working on functions for JSON <-> Text conversions which are part of SQL:2016. Hopefully, we can leverage their implementations in Flink's SQL support. Best, Fabian Am Di., 20. Nov. 2018 um 18:27 Uhr schrieb Flavio

回复:OutOfMemoryError while doing join operation in flink

2018-11-22 Thread zhijiang
Hi Akshay, You encountered an existing issue for serializing large records to cause OOM. Every subpartition would create a separate serializer before, and each serializer would maintain an internal bytes array for storing intermediate serialization results. The key point is that these overhead

Re: Deadlock happens when sink to mysql

2018-11-22 Thread Fabian Hueske
Hi, Which TableSource and TableSink do you use? Best, Fabian Am Mo., 19. Nov. 2018 um 15:39 Uhr schrieb miki haiat : > can you share your entire code please > > On Mon, Nov 19, 2018 at 4:03 PM 徐涛 wrote: > >> Hi Experts, >> I use the following sql, and sink to mysql, >> select >> >>

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread Stefan Richter
Hi, are your RocksDB instances running on local SSDs or on something like EBS? If have previously seen cases where this happened because some EBS quota was exhausted and the performance got throttled. Best, Stefan > On 22. Nov 2018, at 09:51, liujiangang wrote: > > Thank you very much. I

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread liujiangang
Thank you very much. I have something to say. Each data is 20KB. The parallelism is 500 and each taskmanager memory is 10G. The memory is enough, and I think the parallelism is big enough. Only the intervalJoin thread is beyond 100% because of rockdb's seek. I am confused that why rockdb's seek

elasticsearch sink can't connect to elastic cluster with BasicAuth

2018-11-22 Thread hzyuemeng1
after i install x-pack in my elasticsearch cluster and the elasticsearch cluster with basicauth the elasticsearch sink can't connect to elastic clustercode like:DataStream> esSink27 = tableEnv13.toRetractStream(esTable26, Row.class).filter( tuple -> tuple.f0);//generate