Re: error while joining two datastream

2018-11-22 Thread Abhijeet Kumar
DataStream> withTimestampsAndWatermarks1 = formatStream1
.assignTimestampsAndWatermarks(
new 
BoundedOutOfOrdernessTimestampExtractor>(

Time.seconds(Integer.parseInt(parameters.get("watermarkTime" {

/**
 * 
 */
private 
static final long serialVersionUID = 1L;

@Override
public long 
extractTimestamp(

Tuple11 element) {
return 
element.f10;
}
});

DataStream> withTimestampsAndWatermarks2 = formatStream2
.assignTimestampsAndWatermarks(
new 
BoundedOutOfOrdernessTimestampExtractor>(

Time.seconds(Integer.parseInt(parameters.get("watermarkTime" {

/**
 * 
 */
private 
static final long serialVersionUID = 1L;

@Override
public long 
extractTimestamp(

Tuple7 element) {
return 
element.f6;
}
});

withTimestampsAndWatermarks1.print();
withTimestampsAndWatermarks2.print();

DataStream< Tuple17> joined = withTimestampsAndWatermarks1
.join(withTimestampsAndWatermarks2)
.where(new KeySelector, 
String>() {
/**
 * 
 */
private static final long 
serialVersionUID = 1L;

public String getKey(
Tuple11 t1)
throws Exception {
return t1.f0;
}
}).equalTo(new KeySelector, String>() {
/**
 * 
 */
private static final long 
serialVersionUID = 1L;

public String getKey(Tuple7 t1)
throws Exception {
return t1.f0;
}

}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction, 
Tuple7, Tuple17>() {

/**
 * 
 */
private static final long 
serialVersionUID = 1L;

public Tuple17 join(
Tuple11 first,
Tuple7 second) {

return new Tuple17(first.f0, first.f1, first.f2, 
first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, 
second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
}
});

joined.print();

Ok, so now I did it like this. Errors resolved! but, now I'm not able to see 
any output when I'm printing joined datastream.

> On 23-Nov-2018, at 12:42 PM, Nagarjun Guraja  wrote:
> 
> Looks like you need to assign time stamps and emit watermarks to both the 
> streams viz. forma

Re: error while joining two datastream

2018-11-22 Thread Nagarjun Guraja
Looks like you need to assign time stamps and emit watermarks to both the
streams viz. formatStream1 and formatStream2 as described at
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html

On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar 
wrote:

> Hello Team,
>
> I'm new to Flink and coming from Spark background. I need help in
> completing this stream job. I'm reading data from two different Kafka
> topics and I want to join them.
>
> My code:
>
> formatStream1.join(formatStream2)
> .where(new KeySelector String, String, String, String, String, Long>, String>() {
> public String getKey(Tuple11 String, String, String, String, String, Long> t1) throws Exception {
> return t1.f0;
> }
> })
> .equalTo(new KeySelector String, Long>, String>() {
> public String getKey(Tuple7 String, Long> t1) throws Exception {
> return t1.f0;
> }
> }).window(TumblingEventTimeWindows.of(Time.seconds(15)))
> .apply(new JoinFunction String, String, String, String, String, Long>, Tuple7 String, String, String, String, Long>, Tuple17 String, String, String, String, String, String, String, String, String,
> String, String, String, Long, Long>>() {
>
> public Tuple17 String, String, String, String, String, String, String, String, Long, Long>
> join(
> Tuple11 String, String, Long> first,
> Tuple7 second) {
> return new Tuple17 String, String, String, String, String, String, String, String, String,
> Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5,
> first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3,
> second.f4, second.f5, second.f6, first.f10);
> }
> }).print();
>
>
> Error:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(
> JobResult.java:146)
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(
> MiniCluster.java:630)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(
> LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(
> StreamExecutionEnvironment.java:1511)
> at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
> timestamp (= no timestamp marker). Is the time characteristic set to
> 'ProcessingTime', or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
> at
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(
> TumblingEventTimeWindows.java:69)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(
> WindowOperator.java:295)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:202)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:105)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
> In formatStream1 and formatStream2 variable data is coming I checked by
> printing them. So, the issue is in the code which I shared. Thanks in
> advance!!!
>
> Thanks,
>
>
> *Abhijeet Kumar*
> Software Development Engineer,
> Sentienz Solutions Pvt Ltd
> Cognitive Data Platform - Perceive the Data !
> abhijeet.ku...@sentienz.com |www.sentienz.com | Bengaluru
>
> --
Regards,
Nagarjun

*Success is not final, failure is not fatal: it is the courage to continue
that counts. *
*- Winston Churchill - *


error while joining two datastream

2018-11-22 Thread Abhijeet Kumar
Hello Team,

I'm new to Flink and coming from Spark background. I need help in completing 
this stream job. I'm reading data from two different Kafka topics and I want to 
join them.

My code:

formatStream1.join(formatStream2)
.where(new KeySelector, String>() {
public String getKey(Tuple11 t1) throws 
Exception {
return t1.f0;
}
})
.equalTo(new KeySelector, String>() {
public String getKey(Tuple7 t1) throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction, Tuple7, Tuple17>() {

public Tuple17 join(
Tuple11 first,
Tuple7 second) {
return new Tuple17(first.f0, first.f1, first.f2, 
first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, 
second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
}
}).print();


Error:

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at 
com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= 
no timestamp marker). Is the time characteristic set to 'ProcessingTime', or 
did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

In formatStream1 and formatStream2 variable data is coming I checked by 
printing them. So, the issue is in the code which I shared. Thanks in advance!!!

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !
abhijeet.ku...@sentienz.com  
|www.sentienz.com  | Bengaluru




Re: Reset kafka offets to latest on restart

2018-11-22 Thread Tony Wei
Hi Vishal,

AFAIK, the current behavior of kafka source will always use checkpoint
state as the start position, ignoring other configuration.
A workaround solution I can come up with is to set a new uuid to your kafka
source and restore your job with `allowNonRestoreState`.
Therefore, you can use the way that Rong provided to set the initial start
position.

cc. Gordon who know more about the details of kafka source.

Best,
Tony Wei

Rong Rong  於 2018年11月22日 週四 上午8:23寫道:

> Hi Vishal,
>
> You can probably try using similar offset configuration as a service
> consumer.
> Maybe this will be useful to look at [1]
>
> Thanks,
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> On Wed, Nov 21, 2018 at 1:32 PM Jamie Grier  wrote:
>
>> Hi Vishal,
>>
>> No, there is no way to do this currently.
>>
>>
>> On Wed, Nov 21, 2018 at 10:22 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Any one ?
>>>
>>> On Tue, Nov 20, 2018 at 12:48 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Is it possible to have checkpointing but reset the kafka offsets to
 latest on restart on failure ?

>>>


回复:OutOfMemoryError while doing join operation in flink

2018-11-22 Thread zhijiang
Hi Akshay,

Sorrry I have not thought of a proper way to handle single large record in 
distributed task managers in flink. But I can give some hints for adjusting the 
related memories for work around OOM issue.
Large fraction of memories in task manager are managed by flink for efficiency, 
and these memories are long live persistent in JVM not recycled by gc. You can 
check the parameter "taskmanager.memory.fraction" for this and the default 
value is 0.7 if you have not changed, that means 7GB * 0.7 are used by 
framework.

I am not sure what is the flink version you used. If I rememberd correctly, 
before release-1.5 the network buffers also uses heap memories by default, so 
you should also minus this part of memory from total task manager memory.

If not considering network buffer used by framework, you only leave 7GB * 0.3 
temporaray memories for other parts. The temporaray memories in serializer will 
exceed twice as current size every time if not covering the record size, that 
means one serializer may need 2GB overhead memories for your 1GB record. You 
have 2 slots per task manager for running two tasks, so the total overhead 
memories may need 4GB almost. So you can decrease the 
"taskmanager.memory.fraction" in low fraction or increase the total task 
manager to cover this overhead memories, or set one slot for each task manager. 

Best,
Zhijiang


--
发件人:Akshay Mendole 
发送时间:2018年11月23日(星期五) 02:54
收件人:trohrmann 
抄 送:zhijiang ; user ; 
Shreesha Madogaran 
主 题:Re: OutOfMemoryError while doing join operation in flink

Hi,
Thanks for your reply. I tried running a simple "group by" on just one 
dataset where few keys are repeatedly occurring (in order of millions)  and did 
not include any joins. I wanted to see if this issue is specific to join. But 
as I was expecting, I ran into the same issue. I am giving 7GBs to each task 
manager with 2 slots per task manager. From what I understood so far, such 
cases where individual records somewhere in the pipeline become so large that 
they should be handled in distributed manner instead of handling them by a 
simple data structure in single JVM. I am guessing there is no way to do this 
in Flink today. 
Could you please confirm this?
Thanks,
Akshay


On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann  wrote:
Hi Akshay,

Flink currently does not support to automatically distribute hot keys across 
different JVMs. What you can do is to adapt the parallelism/number of 
partitions manually if you encounter that one partition contains a lot of hot 
keys. This might mitigate the problem by partitioning the hot keys into 
different partitions.

Apart from that, the problem seems to be as Zhijiang indicated that your join 
result is quite large. One record is 1 GB large. Try to decrease it or give 
more memory to your TMs.

Cheers,
Till
On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole  wrote:
Hi Zhijiang,
 Thanks for the quick reply. My concern is more towards how 
flink perform joins of two skewed datasets. Pig and spark seems to support the 
join of skewed datasets. The record size that you are mentioning about in your 
reply is after join operation takes place which is definitely going to be huge 
enough not to fit in jvm task manager task slot in my use case. We want to know 
if there is a way in flink to handle such skewed keys by distributing their 
values across different jvms. Let me know if you need more clarity on the issue.
Thanks, 
Akshay 
On Thu, Nov 22, 2018 at 2:38 PM zhijiang  wrote:
Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each 
serializer would maintain an internal bytes array for storing intermediate 
serialization results. The key point is that these overhead internal bytes 
array are not managed by framework, and their size would exceed with the record 
size dynamically. If your job has many subpartitions with large records, it may 
probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for 
all subpartitions [1], that means we only have one bytes array overhead at 
most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can 
increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
--
发件人:Akshay Mendole 
发送时间:2018年11月22日(星期四) 13:43
收件人:user 
主 题:OutOfMemoryError while doing join operation in flink

Hi,
We are converting one of our pig pipelines to flink using apache beam. The 
pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, 
joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it 
has few keys with lot of records. When we converted the pig pipeline to apache 
b

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread liujiangang
Yes, you are right. I add log to record the time of seek and find that
sometimes it is very slow. Then I use the rocksdb's files to test locally
and the same problem appears. It is very weird to find that rocksdb's seek
iterate data one by one. Until now, I add cache for rocksdb. The time is
faster than before but not solved completely. Added code is below:
public ColumnFamilyOptions createColumnOptions() {
// return new ColumnFamilyOptions();
BlockBasedTableConfig blockBasedTableConfig = new
BlockBasedTableConfig();
blockBasedTableConfig.setBlockCacheSize(1024 * 1024 * 
1024);
ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();

columnFamilyOptions.setTableFormatConfig(blockBasedTableConfig);
return columnFamilyOptions;
}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: OutOfMemoryError while doing join operation in flink

2018-11-22 Thread Akshay Mendole
Hi,
Thanks for your reply. I tried running a simple "group by" on just one
dataset where few keys are repeatedly occurring (in order of millions)  and
did not include any joins. I wanted to see if this issue is specific to
join. But as I was expecting, I ran into the same issue. I am giving 7GBs
to each task manager with 2 slots per task manager. From what I understood
so far, such cases where individual records somewhere in the pipeline
become so large that they should be handled in distributed manner instead
of handling them by a simple data structure in single JVM. I am guessing
there is no way to do this in Flink today.
Could you please confirm this?
Thanks,
Akshay


On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann  wrote:

> Hi Akshay,
>
> Flink currently does not support to automatically distribute hot keys
> across different JVMs. What you can do is to adapt the parallelism/number
> of partitions manually if you encounter that one partition contains a lot
> of hot keys. This might mitigate the problem by partitioning the hot keys
> into different partitions.
>
> Apart from that, the problem seems to be as Zhijiang indicated that your
> join result is quite large. One record is 1 GB large. Try to decrease it or
> give more memory to your TMs.
>
> Cheers,
> Till
>
> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole 
> wrote:
>
>> Hi Zhijiang,
>>  Thanks for the quick reply. My concern is more towards
>> how flink perform joins of two *skewed *datasets. Pig
>>  and spark
>>  seems to support the
>> join of skewed datasets. The record size that you are mentioning about in
>> your reply is after join operation takes place which is definitely going to
>> be huge enough not to fit in jvm task manager task slot in my use case. We
>> want to know if there is a way in flink to handle such skewed keys by
>> distributing their values across different jvms. Let me know if you need
>> more clarity on the issue.
>> Thanks,
>> Akshay
>>
>> On Thu, Nov 22, 2018 at 2:38 PM zhijiang 
>> wrote:
>>
>>> Hi Akshay,
>>>
>>> You encountered an existing issue for serializing large records to cause
>>> OOM.
>>>
>>> Every subpartition would create a separate serializer before, and each
>>> serializer would maintain an internal bytes array for storing intermediate
>>> serialization results. The key point is that these overhead internal bytes
>>> array are not managed by framework, and their size would exceed with the
>>> record size dynamically. If your job has many subpartitions with large
>>> records, it may probably cause OOM issue.
>>>
>>> I already improved this issue to some extent by sharing only one
>>> serializer for all subpartitions [1], that means we only have one bytes
>>> array overhead at most. This issue is covered in release-1.7.
>>> Currently the best option may reduce your record size if possible or you
>>> can increase the heap size of task manager container.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9913
>>>
>>> Best,
>>> Zhijiang
>>>
>>> --
>>> 发件人:Akshay Mendole 
>>> 发送时间:2018年11月22日(星期四) 13:43
>>> 收件人:user 
>>> 主 题:OutOfMemoryError while doing join operation in flink
>>>
>>> Hi,
>>> We are converting one of our pig pipelines to flink using apache
>>> beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
>>> enriches them, joins them and dumps back to hdfs. The data set R1 is
>>> skewed. In a sense, it has few keys with lot of records. When we converted
>>> the pig pipeline to apache beam and ran it using flink on a production yarn
>>> cluster, we got the following error
>>>
>>> 2018-11-21 16:52:25,307 ERROR
>>> org.apache.flink.runtime.operators.BatchTask  - Error in
>>> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
>>> java.lang.RuntimeException: Emitting the record caused an I/O exception:
>>> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
>>> JVM heap space
>>> at
>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>>> at
>>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>> at
>>> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>>> at
>>> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>>> at
>>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>>> at
>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>>> at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>> a

Re: Logging Kafka during exceptions

2018-11-22 Thread Scott Sue
Thanks Till,

I’ve raised a JIRA for this: https://issues.apache.org/jira/browse/FLINK-10988. 
 Let me know if theres anything else I can add to the JIRA to help


Regards,
Scott

SCOTT SUE
CHIEF TECHNOLOGY OFFICER

Support Line : +44(0) 2031 371 603
Mobile : +852 9611 3969

9/F, 33 Lockhart Road, Wan Chai, Hong Kong
www.celer-tech.com 






> On 23 Nov 2018, at 00:23, Till Rohrmann  wrote:
> 
> I can see the benefit for other users as well. One could include it as part 
> of some development/debugging tools, for example. It would not strictly need 
> to go into Flink but it would have the benefit of better/increased visibility 
> I guess. In that sense, opening a JIRA issue and posting on dev might be a 
> good idea to check how much interest is there.
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 5:17 PM Scott Sue  > wrote:
> Hi Till,
> 
> Yeah I think that would work especially knowing this isn’ something that is 
> out of the box at the moment.  Do you think its worth raising this as a 
> feature request at all?  I think that’s one thing with my experience with 
> Flink is that its quite hard to debug what is going on when there is an 
> unexpected exception.
> 
> 
> Regards,
> Scott
> 
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
> 
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
> 
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com 
> 
> 
> 
> 
> 
> 
>> On 23 Nov 2018, at 00:12, Till Rohrmann > > wrote:
>> 
>> Hi Scott,
>> 
>> I think you could write some Wrappers for the different user function types 
>> which could contain the logging logic. That way you would still need to wrap 
>> you actual business logic but don't have to duplicate the logic over and 
>> over again.
>> 
>> If you also want to log the state, then you would need to wrap the 
>> RuntimeContext to interfere all state registering calls so that you can keep 
>> track of them.
>> 
>> Would that work for you?
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Nov 22, 2018 at 8:44 AM Scott Sue > > wrote:
>> Yeah I think that would work for incorrect data consumed, but not for if 
>> deserialization passes correctly, but one of my custom functions post 
>> deserialization generates an error?
>> 
>> 
>> Regards,
>> Scott
>> 
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>> 
>> Support Line : +44(0) 2031 371 603
>> Mobile : +852 9611 3969
>> 
>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>> www.celer-tech.com 
>> 
>> 
>> 
>> 
>> 
>> 
>>> On 22 Nov 2018, at 15:15, miki haiat >> > wrote:
>>> 
>>> If so , then you can implement your own deserializer[1] with costume logic  
>>> and error handling 
>>> 
>>> 
>>> 
>>> 1.https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>>>  
>>> 
>>> 
>>> 
>>> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue >> > wrote:
>>> Json is sent into Kafka
>>> 
>>> 
>>> Regards,
>>> Scott
>>> 
>>> SCOTT SUE
>>> CHIEF TECHNOLOGY OFFICER
>>> 
>>> Support Line : +44(0) 2031 371 603
>>> Mobile : +852 9611 3969
>>> 
>>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>>> www.celer-tech.com 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
 On 22 Nov 2018, at 14:55, miki haiat >>> > wrote:
 
 Which data format   is sent to kafka ? 
 Json Avro Other ?
 
 
 
 On Thu, Nov 22, 2018 at 7:36 AM Scott Sue >>> > wrote:
 Unexpected data meaning business level data that I didn’t expect to 
 receive. So business level data that doesn’t quite conform
 
 On Thu, 22 Nov 2018 at 13:30, miki haiat >>> > wrote:
  Unexpected data you mean parsing error ?
 Which format is sent to Kafka ?
 
 
 
 On Thu, 22 Nov 2018, 6:59 Scott Sue >>>  wrote:
 Hi all,
 
 When I'm running my jobs I am consuming data from Kafka to process in my
 job.  Unfortunately my job receives unexpected data from time to time which
 I'm trying to find the root cause of the issue.
 
 Ideally, I want to be able to have a way to know when the job has failed 
 due
 to an exception, to then log to file the last message that it was consuming
 at the time to help track down the offending message consumed.  How is this
 possible within Flink?
 
 Thinking about this more, it may not be a consumed message that killed the
 job, but maybe a transformation within the job itself and it died in a
 downstream Operator.  In this case, is there a way to log to f

Re: Logging Kafka during exceptions

2018-11-22 Thread Till Rohrmann
I can see the benefit for other users as well. One could include it as part
of some development/debugging tools, for example. It would not strictly
need to go into Flink but it would have the benefit of better/increased
visibility I guess. In that sense, opening a JIRA issue and posting on dev
might be a good idea to check how much interest is there.

Cheers,
Till

On Thu, Nov 22, 2018 at 5:17 PM Scott Sue  wrote:

> Hi Till,
>
> Yeah I think that would work especially knowing this isn’ something that
> is out of the box at the moment.  Do you think its worth raising this as a
> feature request at all?  I think that’s one thing with my experience with
> Flink is that its quite hard to debug what is going on when there is an
> unexpected exception.
>
>
> Regards,
> Scott
>
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
>
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
>
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com
>
>
>
>
>
>
>
> On 23 Nov 2018, at 00:12, Till Rohrmann  wrote:
>
> Hi Scott,
>
> I think you could write some Wrappers for the different user function
> types which could contain the logging logic. That way you would still need
> to wrap you actual business logic but don't have to duplicate the logic
> over and over again.
>
> If you also want to log the state, then you would need to wrap the
> RuntimeContext to interfere all state registering calls so that you can
> keep track of them.
>
> Would that work for you?
>
> Cheers,
> Till
>
> On Thu, Nov 22, 2018 at 8:44 AM Scott Sue 
> wrote:
>
>> Yeah I think that would work for incorrect data consumed, but not for if
>> deserialization passes correctly, but one of my custom functions
>> post deserialization generates an error?
>>
>>
>> Regards,
>> Scott
>>
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>>
>> Support Line : +44(0) 2031 371 603
>> Mobile : +852 9611 3969
>>
>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>> www.celer-tech.com
>>
>>
>>
>>
>>
>>
>>
>> On 22 Nov 2018, at 15:15, miki haiat  wrote:
>>
>> If so , then you can implement your own deserializer[1] with costume
>> logic  and error handling
>>
>>
>>
>> 1.
>> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>>
>>
>> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue 
>> wrote:
>>
>>> Json is sent into Kafka
>>>
>>>
>>> Regards,
>>> Scott
>>>
>>> SCOTT SUE
>>> CHIEF TECHNOLOGY OFFICER
>>>
>>> Support Line : +44(0) 2031 371 603
>>> Mobile : +852 9611 3969
>>>
>>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>>> www.celer-tech.com
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 22 Nov 2018, at 14:55, miki haiat  wrote:
>>>
>>> Which data format   is sent to kafka ?
>>> Json Avro Other ?
>>>
>>>
>>>
>>> On Thu, Nov 22, 2018 at 7:36 AM Scott Sue 
>>> wrote:
>>>
 Unexpected data meaning business level data that I didn’t expect to
 receive. So business level data that doesn’t quite conform

 On Thu, 22 Nov 2018 at 13:30, miki haiat  wrote:

>  Unexpected data you mean parsing error ?
> Which format is sent to Kafka ?
>
>
>
> On Thu, 22 Nov 2018, 6:59 Scott Sue 
>> Hi all,
>>
>> When I'm running my jobs I am consuming data from Kafka to process in
>> my
>> job.  Unfortunately my job receives unexpected data from time to time
>> which
>> I'm trying to find the root cause of the issue.
>>
>> Ideally, I want to be able to have a way to know when the job has
>> failed due
>> to an exception, to then log to file the last message that it was
>> consuming
>> at the time to help track down the offending message consumed.  How
>> is this
>> possible within Flink?
>>
>> Thinking about this more, it may not be a consumed message that
>> killed the
>> job, but maybe a transformation within the job itself and it died in a
>> downstream Operator.  In this case, is there a way to log to file the
>> message that an Operator was processing at the time that caused the
>> exception?
>>
>>
>> Thanks in advance!
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
> --


 Regards,
 Scott

 SCOTT SUE
 CHIEF TECHNOLOGY OFFICER

 Support Line : +44(0) 2031 371 603 <+44%2020%203137%201603>
 Mobile : +852 9611 3969 <9611%203969>

 9/F, 33 Lockhart Road, Wanchai, Hong Kong
 www.celer-tech.com

 *This message, including any attachments, may include private,
 privileged and confidential information and is intended only for the
 personal and confidential use of the intended recipient(s). If the reader
 of this message is not an intended recipient, you are hereby notified that
 any review, use, dissemination, distribution, printing or copying of this
 message or its contents is strictly prohibited and may be unlawful. 

Re: Flink restart strategy on specific exception

2018-11-22 Thread Till Rohrmann
Hi Kasif,

I think in this situation it is best if you defined your own custom
RestartStrategy by specifying a class which has a `RestartStrategyFactory
createFactory(Configuration configuration)` method as `restart-strategy:
MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.

Cheers,
Till

On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  wrote:

> Hello,
>
>
>
> Looking at existing restart strategies they are kind of generic. We have a
> requirement to restart the job only in case of specific exception/issues.
>
> What would be the best way to have a re start strategy which is based on
> few rules like looking at particular type of exception or some extra
> condition checks which are application specific.?
>
>
>
> Just a background on one specific issue which invoked this requirement is
> slots not getting released when the job finishes. In our applications, we
> keep track of jobs submitted with the amount of parallelism allotted to
> it.  Once the job finishes we assume that the slots are free and try to
> submit next set of jobs which at times fail with error  “not enough slots
> available”.
>
>
>
> So we think a job re start can solve this issue but we only want to re
> start only if this particular situation is encountered.
>
>
>
> Please let us know If there are better ways to solve this problem other
> than re start strategy.
>
>
>
> Thanks,
>
> Kasif
>
>
>
> --
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>


Re: Logging Kafka during exceptions

2018-11-22 Thread Scott Sue
Hi Till,

Yeah I think that would work especially knowing this isn’ something that is out 
of the box at the moment.  Do you think its worth raising this as a feature 
request at all?  I think that’s one thing with my experience with Flink is that 
its quite hard to debug what is going on when there is an unexpected exception.


Regards,
Scott

SCOTT SUE
CHIEF TECHNOLOGY OFFICER

Support Line : +44(0) 2031 371 603
Mobile : +852 9611 3969

9/F, 33 Lockhart Road, Wan Chai, Hong Kong
www.celer-tech.com 






> On 23 Nov 2018, at 00:12, Till Rohrmann  wrote:
> 
> Hi Scott,
> 
> I think you could write some Wrappers for the different user function types 
> which could contain the logging logic. That way you would still need to wrap 
> you actual business logic but don't have to duplicate the logic over and over 
> again.
> 
> If you also want to log the state, then you would need to wrap the 
> RuntimeContext to interfere all state registering calls so that you can keep 
> track of them.
> 
> Would that work for you?
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 8:44 AM Scott Sue  > wrote:
> Yeah I think that would work for incorrect data consumed, but not for if 
> deserialization passes correctly, but one of my custom functions post 
> deserialization generates an error?
> 
> 
> Regards,
> Scott
> 
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
> 
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
> 
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com 
> 
> 
> 
> 
> 
> 
>> On 22 Nov 2018, at 15:15, miki haiat > > wrote:
>> 
>> If so , then you can implement your own deserializer[1] with costume logic  
>> and error handling 
>> 
>> 
>> 
>> 1.https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>>  
>> 
>> 
>> 
>> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue > > wrote:
>> Json is sent into Kafka
>> 
>> 
>> Regards,
>> Scott
>> 
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>> 
>> Support Line : +44(0) 2031 371 603
>> Mobile : +852 9611 3969
>> 
>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>> www.celer-tech.com 
>> 
>> 
>> 
>> 
>> 
>> 
>>> On 22 Nov 2018, at 14:55, miki haiat >> > wrote:
>>> 
>>> Which data format   is sent to kafka ? 
>>> Json Avro Other ?
>>> 
>>> 
>>> 
>>> On Thu, Nov 22, 2018 at 7:36 AM Scott Sue >> > wrote:
>>> Unexpected data meaning business level data that I didn’t expect to 
>>> receive. So business level data that doesn’t quite conform
>>> 
>>> On Thu, 22 Nov 2018 at 13:30, miki haiat >> > wrote:
>>>  Unexpected data you mean parsing error ?
>>> Which format is sent to Kafka ?
>>> 
>>> 
>>> 
>>> On Thu, 22 Nov 2018, 6:59 Scott Sue >>  wrote:
>>> Hi all,
>>> 
>>> When I'm running my jobs I am consuming data from Kafka to process in my
>>> job.  Unfortunately my job receives unexpected data from time to time which
>>> I'm trying to find the root cause of the issue.
>>> 
>>> Ideally, I want to be able to have a way to know when the job has failed due
>>> to an exception, to then log to file the last message that it was consuming
>>> at the time to help track down the offending message consumed.  How is this
>>> possible within Flink?
>>> 
>>> Thinking about this more, it may not be a consumed message that killed the
>>> job, but maybe a transformation within the job itself and it died in a
>>> downstream Operator.  In this case, is there a way to log to file the
>>> message that an Operator was processing at the time that caused the
>>> exception?
>>> 
>>> 
>>> Thanks in advance!
>>> 
>>> 
>>> 
>>> --
>>> Sent from: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>>> 
>>> -- 
>>> 
>>> 
>>> Regards,
>>> Scott
>>> 
>>> SCOTT SUE
>>> CHIEF TECHNOLOGY OFFICER
>>> 
>>> Support Line : +44(0) 2031 371 603 
>>> Mobile : +852 9611 3969 
>>> 
>>> 9/F, 33 Lockhart Road, Wanchai, Hong Kong
>>> www.celer-tech.com 
>>> This message, including any attachments, may include private, privileged 
>>> and confidential information and is intended only for the personal and 
>>> confidential use of the intended recipient(s). If the reader of this 
>>> message is not an intended recipient, you are hereby notified that any 
>>> review, use, dissemination, distribution, printing or copying of this 
>>> message or its contents is strictly prohibited and may be unlawful. If you 
>>> are not an intended recipient or have received this communicati

Re: Logging Kafka during exceptions

2018-11-22 Thread Till Rohrmann
Hi Scott,

I think you could write some Wrappers for the different user function types
which could contain the logging logic. That way you would still need to
wrap you actual business logic but don't have to duplicate the logic over
and over again.

If you also want to log the state, then you would need to wrap the
RuntimeContext to interfere all state registering calls so that you can
keep track of them.

Would that work for you?

Cheers,
Till

On Thu, Nov 22, 2018 at 8:44 AM Scott Sue  wrote:

> Yeah I think that would work for incorrect data consumed, but not for if
> deserialization passes correctly, but one of my custom functions
> post deserialization generates an error?
>
>
> Regards,
> Scott
>
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
>
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
>
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com
>
>
>
>
>
>
>
> On 22 Nov 2018, at 15:15, miki haiat  wrote:
>
> If so , then you can implement your own deserializer[1] with costume
> logic  and error handling
>
>
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>
>
> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue 
> wrote:
>
>> Json is sent into Kafka
>>
>>
>> Regards,
>> Scott
>>
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>>
>> Support Line : +44(0) 2031 371 603
>> Mobile : +852 9611 3969
>>
>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>> www.celer-tech.com
>>
>>
>>
>>
>>
>>
>>
>> On 22 Nov 2018, at 14:55, miki haiat  wrote:
>>
>> Which data format   is sent to kafka ?
>> Json Avro Other ?
>>
>>
>>
>> On Thu, Nov 22, 2018 at 7:36 AM Scott Sue 
>> wrote:
>>
>>> Unexpected data meaning business level data that I didn’t expect to
>>> receive. So business level data that doesn’t quite conform
>>>
>>> On Thu, 22 Nov 2018 at 13:30, miki haiat  wrote:
>>>
  Unexpected data you mean parsing error ?
 Which format is sent to Kafka ?



 On Thu, 22 Nov 2018, 6:59 Scott Sue >>>
> Hi all,
>
> When I'm running my jobs I am consuming data from Kafka to process in
> my
> job.  Unfortunately my job receives unexpected data from time to time
> which
> I'm trying to find the root cause of the issue.
>
> Ideally, I want to be able to have a way to know when the job has
> failed due
> to an exception, to then log to file the last message that it was
> consuming
> at the time to help track down the offending message consumed.  How is
> this
> possible within Flink?
>
> Thinking about this more, it may not be a consumed message that killed
> the
> job, but maybe a transformation within the job itself and it died in a
> downstream Operator.  In this case, is there a way to log to file the
> message that an Operator was processing at the time that caused the
> exception?
>
>
> Thanks in advance!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
 --
>>>
>>>
>>> Regards,
>>> Scott
>>>
>>> SCOTT SUE
>>> CHIEF TECHNOLOGY OFFICER
>>>
>>> Support Line : +44(0) 2031 371 603 <+44%2020%203137%201603>
>>> Mobile : +852 9611 3969 <9611%203969>
>>>
>>> 9/F, 33 Lockhart Road, Wanchai, Hong Kong
>>> www.celer-tech.com
>>>
>>> *This message, including any attachments, may include private,
>>> privileged and confidential information and is intended only for the
>>> personal and confidential use of the intended recipient(s). If the reader
>>> of this message is not an intended recipient, you are hereby notified that
>>> any review, use, dissemination, distribution, printing or copying of this
>>> message or its contents is strictly prohibited and may be unlawful. If you
>>> are not an intended recipient or have received this communication in error,
>>> please immediately notify the sender by telephone and/or a reply email and
>>> permanently delete the original message, including any attachments, without
>>> making a copy.*
>>>
>>
>>
>> *This message, including any attachments, may include private, privileged
>> and confidential information and is intended only for the personal and
>> confidential use of the intended recipient(s). If the reader of this
>> message is not an intended recipient, you are hereby notified that any
>> review, use, dissemination, distribution, printing or copying of this
>> message or its contents is strictly prohibited and may be unlawful. If you
>> are not an intended recipient or have received this communication in error,
>> please immediately notify the sender by telephone and/or a reply email and
>> permanently delete the original message, including any attachments, without
>> making a copy.*
>>
>
>
> *This message, including any attachments, may include private, privileged
> and confidential information and is intended only for the personal and
> confidential use of the intended recipient(s). If

Re: elasticsearch sink can't connect to elastic cluster with BasicAuth

2018-11-22 Thread Till Rohrmann
Hi,

I think you need to a custom `RestClientFactory` which enables basic auth
on the ElasticSearch RestClient according to this documentation [1]. You
can set the RestClientFactory on the ElasticsearchSink.Builder.

[1]
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html

Cheers,
Till

On Thu, Nov 22, 2018 at 9:36 AM hzyuemeng1 
wrote:

>
> after i install x-pack in my elasticsearch cluster and the elasticsearch
> cluster with basicauth
> the elasticsearch sink can't connect to elastic cluster
>
> code like:
>
> DataStream> esSink27 =
> tableEnv13.toRetractStream(esTable26, Row.class).filter( tuple -> tuple.f0);
> //generate user config map
> java.util.Map userConfigMap22 =
> com.google.common.collect.Maps.newHashMap();
> userConfigMap22.put("cluster.name", "test-magina");
> userConfigMap22.put("bulk.flush.max.actions", "1");
> //userConfigMap22.put("shield.user", "elastic:magina1001password");
>
> //generate transports list
> Splitter commaSplitter24 = Splitter.on(",");
> Splitter colonSplitter25 = Splitter.on(":");
> List transportsList23 = Lists.newArrayList();
> for (String transport : commaSplitter24.split("101.206.91.118:9300")) {
> List ipAndPort = colonSplitter25.splitToList(transport);
> transportsList23.add(new
> InetSocketAddress(InetAddress.getByName(ipAndPort.get(0)),
> Integer.parseInt(ipAndPort.get(1;
> }
> esSink27.addSink(new ElasticsearchSink Row>>(userConfigMap22, transportsList23, new
> MaginaES5SinkFunction(esTable26.getSchema().getColumnNames(), "userid",
> "test-au", "test-au", "action,num"), new
> RetryRejectedExecutionFailureHandler())).name("elasticsearch_4068").setParallelism(2);
>
>
>-
>
>Any help will be greatly appreciated
>
>
> hzyuemeng1
> hzyueme...@corp.netease.com
>
> 
> 签名由 网易邮箱大师  定制
>


Re: OutOfMemoryError while doing join operation in flink

2018-11-22 Thread Till Rohrmann
Hi Akshay,

Flink currently does not support to automatically distribute hot keys
across different JVMs. What you can do is to adapt the parallelism/number
of partitions manually if you encounter that one partition contains a lot
of hot keys. This might mitigate the problem by partitioning the hot keys
into different partitions.

Apart from that, the problem seems to be as Zhijiang indicated that your
join result is quite large. One record is 1 GB large. Try to decrease it or
give more memory to your TMs.

Cheers,
Till

On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole 
wrote:

> Hi Zhijiang,
>  Thanks for the quick reply. My concern is more towards
> how flink perform joins of two *skewed *datasets. Pig
>  and spark
>  seems to support the join
> of skewed datasets. The record size that you are mentioning about in your
> reply is after join operation takes place which is definitely going to be
> huge enough not to fit in jvm task manager task slot in my use case. We
> want to know if there is a way in flink to handle such skewed keys by
> distributing their values across different jvms. Let me know if you need
> more clarity on the issue.
> Thanks,
> Akshay
>
> On Thu, Nov 22, 2018 at 2:38 PM zhijiang 
> wrote:
>
>> Hi Akshay,
>>
>> You encountered an existing issue for serializing large records to cause
>> OOM.
>>
>> Every subpartition would create a separate serializer before, and each
>> serializer would maintain an internal bytes array for storing intermediate
>> serialization results. The key point is that these overhead internal bytes
>> array are not managed by framework, and their size would exceed with the
>> record size dynamically. If your job has many subpartitions with large
>> records, it may probably cause OOM issue.
>>
>> I already improved this issue to some extent by sharing only one
>> serializer for all subpartitions [1], that means we only have one bytes
>> array overhead at most. This issue is covered in release-1.7.
>> Currently the best option may reduce your record size if possible or you
>> can increase the heap size of task manager container.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9913
>>
>> Best,
>> Zhijiang
>>
>> --
>> 发件人:Akshay Mendole 
>> 发送时间:2018年11月22日(星期四) 13:43
>> 收件人:user 
>> 主 题:OutOfMemoryError while doing join operation in flink
>>
>> Hi,
>> We are converting one of our pig pipelines to flink using apache
>> beam. The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
>> enriches them, joins them and dumps back to hdfs. The data set R1 is
>> skewed. In a sense, it has few keys with lot of records. When we converted
>> the pig pipeline to apache beam and ran it using flink on a production yarn
>> cluster, we got the following error
>>
>> 2018-11-21 16:52:25,307 ERROR
>> org.apache.flink.runtime.operators.BatchTask  - Error in
>> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
>> java.lang.RuntimeException: Emitting the record caused an I/O exception:
>> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
>> JVM heap space
>> at
>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>> at
>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>> at
>> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>> at
>> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>> at
>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>> at
>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>> at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>> at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Failed to serialize element. Serialized
>> size (> 1136656562 bytes) exceeds JVM heap space
>> at
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
>> at
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>> at java.io.DataOutputStream.write(DataOutputStream.java:107)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>  

Re: DataSet - Broadcast set in output format

2018-11-22 Thread Till Rohrmann
Hi Bastien,

the OutputFormat specifies how a given record is written to an external
system. The DataSink using these formats do not support using broadcast
variables. This is currently a limitation of Flink.

What you could do is to introduce a mapper before your sink which enriches
the records with respect to the broadcast variable. The OutputFormat could
then react to this additional information.

Cheers,
Till

On Thu, Nov 22, 2018 at 2:57 PM bastien dine  wrote:

> Hello,
>
> I would like to use a broadcast variable in my outputformat (to pass some
> information, and control execution flow)
> How would I do it ?
> .output does not have a .withBroadcast function as it does not extends
> SingleInputUdfOperator
>
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>


Re: Passing application configuring to Flink uber jar

2018-11-22 Thread Till Rohrmann
Hi Krishna,

I think the problem is that you are trying to pass in dynamic properties
(-Dconfig.file=dev.conf) to an already started the cluster. The Flink
cluster components or their JVMs need to know the env.java.opts at cluster
start up time and not when the Flink job is submitted. You can check this
by looking into the logs. They contain the parameters with which the JVMs
are started.

You could, however, use the per job mode where you start a cluster per job
(-m yarn-cluster).

Cheers,
Till

On Thu, Nov 22, 2018 at 3:27 PM Krishna Kalyan 
wrote:

> Hello All,
>
> I have a Flink application that inherits configuration from
> application.conf in the resources folder.
>
> Now, I want to run this application on a cluster. I begin creating an fat
> jar "mvn clean assembly".
>
> This jar file is executed by executing the following command below
>
> $FLINK_HOME/bin/flink run --class "com.test" -D 
> "env.java.opts=-Dconfig.file=dev.conf" 
> /opt/resources/iot-flink-assembly-0.1.0.jar
>
>
> This causes and error that the configuration key was not found. I could
> someone please let me know how to pass custom application.conf environment
> file to cluster. I could not find a lot of information online on how to do
> this.
>
> Regards,
> Krishna
>
> Reference:
> [1]
> https://stackoverflow.com/questions/46157479/apache-flink-how-do-i-specify-an-external-application-conf-file?rq=1
> [2] https://github.com/lightbend/config/issues/84
>
>
>
> --
>
> Krishna Kalyan
>
> M +49 151 44159906 <+49%20151%2044159906>
>
> Standorte in Stuttgart und Berlin  · Zoi
> TechCon GmbH · Quellenstr. 7 · 70376 Stuttgart · Geschäftsführer: Benjamin
> Hermann, Dr. Daniel Heubach. Amtsgericht Stuttgart HRB 759619,
> Gerichtsstand Stuttgart. Die genannten Angaben werden automatisch
> hinzugefügt und lassen keine Rückschlüsse auf den Rechtscharakter der
> E-Mail zu. This message (including any attachments) contains confidential
> information intended for a specific individual and purpose, and is
> protected by law. If you are not the intended recipient, you should delete
> this message. Any disclosure, copying, or distribution of this message, or
> the taking of any action based on it, is strictly prohibited.
>
>


Passing application configuring to Flink uber jar

2018-11-22 Thread Krishna Kalyan
Hello All,

I have a Flink application that inherits configuration from
application.conf in the resources folder.

Now, I want to run this application on a cluster. I begin creating an fat
jar "mvn clean assembly".

This jar file is executed by executing the following command below

$FLINK_HOME/bin/flink run --class "com.test" -D
"env.java.opts=-Dconfig.file=dev.conf"
/opt/resources/iot-flink-assembly-0.1.0.jar


This causes and error that the configuration key was not found. I could
someone please let me know how to pass custom application.conf environment
file to cluster. I could not find a lot of information online on how to do
this.

Regards,
Krishna

Reference:
[1]
https://stackoverflow.com/questions/46157479/apache-flink-how-do-i-specify-an-external-application-conf-file?rq=1
[2] https://github.com/lightbend/config/issues/84



-- 

Krishna Kalyan

M +49 151 44159906 <+49%20151%2044159906>

-- 
Standorte in Stuttgart und Berlin  · Zoi 
TechCon GmbH · Quellenstr. 7 · 70376 Stuttgart · Geschäftsführer: Benjamin 
Hermann, Dr. Daniel Heubach. Amtsgericht Stuttgart HRB 759619, 
Gerichtsstand Stuttgart. Die genannten Angaben werden automatisch 
hinzugefügt und lassen keine Rückschlüsse auf den Rechtscharakter der 
E-Mail zu. This message (including any attachments) contains confidential 
information intended for a specific individual and purpose, and is 
protected by law. If you are not the intended recipient, you should delete 
this message. Any disclosure, copying, or distribution of this message, or 
the taking of any action based on it, is strictly prohibited.




DataSet - Broadcast set in output format

2018-11-22 Thread bastien dine
Hello,

I would like to use a broadcast variable in my outputformat (to pass some
information, and control execution flow)
How would I do it ?
.output does not have a .withBroadcast function as it does not extends
SingleInputUdfOperator


--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread Stefan Richter
Btw how did you make sure that it is stuck in the seek call and that the trace 
does not show different invocations of seek? This can indicate that seek is 
slow, but is not yet proof that you are stuck.

> On 22. Nov 2018, at 13:01, liujiangang  wrote:
> 
> This is not my case. Thank you.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: OutOfMemoryError while doing join operation in flink

2018-11-22 Thread Akshay Mendole
Hi Zhijiang,
 Thanks for the quick reply. My concern is more towards how
flink perform joins of two *skewed *datasets. Pig
 and spark
 seems to support the join
of skewed datasets. The record size that you are mentioning about in your
reply is after join operation takes place which is definitely going to be
huge enough not to fit in jvm task manager task slot in my use case. We
want to know if there is a way in flink to handle such skewed keys by
distributing their values across different jvms. Let me know if you need
more clarity on the issue.
Thanks,
Akshay

On Thu, Nov 22, 2018 at 2:38 PM zhijiang  wrote:

> Hi Akshay,
>
> You encountered an existing issue for serializing large records to cause
> OOM.
>
> Every subpartition would create a separate serializer before, and each
> serializer would maintain an internal bytes array for storing intermediate
> serialization results. The key point is that these overhead internal bytes
> array are not managed by framework, and their size would exceed with the
> record size dynamically. If your job has many subpartitions with large
> records, it may probably cause OOM issue.
>
> I already improved this issue to some extent by sharing only one
> serializer for all subpartitions [1], that means we only have one bytes
> array overhead at most. This issue is covered in release-1.7.
> Currently the best option may reduce your record size if possible or you
> can increase the heap size of task manager container.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9913
>
> Best,
> Zhijiang
>
> --
> 发件人:Akshay Mendole 
> 发送时间:2018年11月22日(星期四) 13:43
> 收件人:user 
> 主 题:OutOfMemoryError while doing join operation in flink
>
> Hi,
> We are converting one of our pig pipelines to flink using apache beam.
> The pig pipeline reads two different data sets (R1 & R2)  from hdfs,
> enriches them, joins them and dumps back to hdfs. The data set R1 is
> skewed. In a sense, it has few keys with lot of records. When we converted
> the pig pipeline to apache beam and ran it using flink on a production yarn
> cluster, we got the following error
>
> 2018-11-21 16:52:25,307 ERROR
> org.apache.flink.runtime.operators.BatchTask  - Error in
> task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
> java.lang.RuntimeException: Emitting the record caused an I/O exception:
> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds
> JVM heap space
> at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
> at
> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
> at
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to serialize element. Serialized
> size (> 1136656562 bytes) exceeds JVM heap space
> at
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
> at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
> at
> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at
> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
> at
> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
> at
> org.apache.beam.sdk.coders.Serializ

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread liujiangang
This is not my case. Thank you.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Unbalanced Kafka consumer consumption

2018-11-22 Thread Gerard Garcia
HI Till,

sorry for the late reply, I was on holidays and couldn't follow up the
issue.

1. Flink 1.6.1, Kafka 1.1.0
2. The topic has 64 partitions. We don't have so many slots available but
we could try this.
3. Yes, they are running in different nodes
4. I meant that until the operator that is represented by its lag (meaning
the amount of messages to consume) with a blue line (in the background)
finishes consuming all its pending messages the rate does not increase.
 Yes, the problem appears when there is pending data (persisted) because it
can keep with the incoming rate or the app has been stopped.
5. Yes. I guess you mean numRecordsOutPerSecond. We will monitor this to
have data the next time it happens.

I'll try to reduce the number of partitions so they can be assigned one per
source task and see how it behaves.

Thanks,

Gerard

On Wed, Nov 7, 2018 at 5:38 PM Till Rohrmann  wrote:

> Hi Gerard,
>
> the behaviour you are describing sounds odd to me. I have a couple of
> questions:
>
> 1. Which Flink and Kafka version are you using?
> 2. How many partitions do you have? --> Try to set the parallelism of your
> job to the number of partitions. That way, you will have one partition per
> source task.
> 3. How are the source operators distributed? Are they running on different
> nodes?
> 4. What do you mean with "until it (the blue one) was finished consuming
> the partition"? I assume that you don't ingest into the Kafka topic live
> but want to read persisted data.
> 5. Are you using Flink's metrics to monitor the different source tasks?
> Check what the source operator's output rate is (should be visible from the
> web UI).
>
> Cheers,
> Till
>
> On Tue, Oct 30, 2018 at 10:27 AM Gerard Garcia  wrote:
>
>> I think my problem is not the same, yours is that you want to consume
>> from partitions with more data faster to avoid consuming first the one with
>> less elements which could advance the event time too fast. Mine is that
>> Kafka only consumes from some partitions even if it seems that it has
>> resources to read and process from all of them at the same time.
>>
>> Gerard
>>
>> On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy  wrote:
>>
>>> Hi,
>>>
>>>If I understand your problem correctly, there is a similar JIRA
>>> issue FLINK-10348, reported by me. Maybe you can take a look at it.
>>>
>>>
>>> Jiayi Liao,Best
>>>
>>>  Original Message
>>> *Sender:* Gerard Garcia
>>> *Recipient:* fearsome.lucidity
>>> *Cc:* user
>>> *Date:* Monday, Oct 29, 2018 17:50
>>> *Subject:* Re: Unbalanced Kafka consumer consumption
>>>
>>> The stream is partitioned by key after ingestion at the finest
>>> granularity that we can (which is finer than how stream is partitioned when
>>> produced to kafka). It is not perfectly balanced but still is not so
>>> unbalanced to show this behavior (more balanced than what the lag images
>>> show).
>>>
>>> Anyway, let's assume that the problem is that the stream is so
>>> unbalanced that one operator subtask can't handle the ingestion rate. It is
>>> expected then that all the others operators reduce its ingestion rate even
>>> if they have resources to spare? The task is configured with processing
>>> time and there are no windows. If that is the case, is there a way to let
>>> operator subtasks process freely even if one of them is causing back
>>> pressure upstream?
>>>
>>> The attached images shows how Kafka lag increases while the throughput
>>> is stable until some operator subtasks finish.
>>>
>>> Thanks,
>>>
>>> Gerard
>>>
>>> On Fri, Oct 26, 2018 at 8:09 PM Elias Levy 
>>> wrote:
>>>
 You can always shuffle the stream generated by the Kafka source
 (dataStream.shuffle()) to evenly distribute records downstream.

 On Fri, Oct 26, 2018 at 2:08 AM gerardg  wrote:

> Hi,
>
> We are experience issues scaling our Flink application and we have
> observed
> that it may be because Kafka messages consumption is not balanced
> across
> partitions. The attached image (lag per partition) shows how only one
> partition consumes messages (the blue one in the back) and it wasn't
> until
> it finished that the other ones started to consume at a good rate
> (actually
> the total throughput multiplied by 4 when these started) . Also, when
> that
> ones started to consume, one partition just stopped an accumulated
> messages
> back again until they finished.
>
> We don't see any resource (CPU, network, disk..) struggling in our
> cluster
> so we are not sure what could be causing this behavior. I can only
> assume
> that somehow Flink or the Kafka consumer is artificially slowing down
> the
> other partitions. Maybe due to how back pressure is handled?
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>
>
>
> Gerard
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user

Re: Flink JSON (string) to Pojo (and vice versa) example

2018-11-22 Thread Fabian Hueske
Thanks Flavio.
This looks very useful.

AFAIK, the Calcite community is also working on functions for JSON <-> Text
conversions which are part of SQL:2016.
Hopefully, we can leverage their implementations in Flink's SQL support.

Best, Fabian

Am Di., 20. Nov. 2018 um 18:27 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> Hi everybody,
> since here at Okkam we didn't find any "native" Flink map functions that
> already permit to pass from JSON strings to POJOs (and vice versa), we
> decided to share with the Flink community a simple implementation for these
> 2 tasks:
>  - JSON (string) to POJO [1]
>  - POJO to JSON (string) [2].
> A Flink Main class that use those 2 functions can be found at [1].
>
> Any feedback is welcome!
>
> Best,
> Flavio
>
> [1]
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/flink/Json2PojoExample.java
> [2]
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/flink/json/JsonStringToPojo.java
> [3]
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/flink/json/Pojo2JsonString.java
>


回复:OutOfMemoryError while doing join operation in flink

2018-11-22 Thread zhijiang
Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each 
serializer would maintain an internal bytes array for storing intermediate 
serialization results. The key point is that these overhead internal bytes 
array are not managed by framework, and their size would exceed with the record 
size dynamically. If your job has many subpartitions with large records, it may 
probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for 
all subpartitions [1], that means we only have one bytes array overhead at 
most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can 
increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
--
发件人:Akshay Mendole 
发送时间:2018年11月22日(星期四) 13:43
收件人:user 
主 题:OutOfMemoryError while doing join operation in flink

Hi,
We are converting one of our pig pipelines to flink using apache beam. The 
pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, 
joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it 
has few keys with lot of records. When we converted the pig pipeline to apache 
beam and ran it using flink on a production yarn cluster, we got the following 
error 

2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask  
- Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) 
(25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed 
to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap 
space
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at 
org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at 
org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 
1136656562 bytes) exceeds JVM heap space
at 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at 
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at

Re: Deadlock happens when sink to mysql

2018-11-22 Thread Fabian Hueske
Hi,

Which TableSource and TableSink do you use?

Best, Fabian

Am Mo., 19. Nov. 2018 um 15:39 Uhr schrieb miki haiat :

> can you share your entire code please
>
> On Mon, Nov 19, 2018 at 4:03 PM 徐涛  wrote:
>
>> Hi Experts,
>> I use the following sql, and sink to mysql,
>> select
>>
>> album_id, date
>> count(1)
>> from
>> coupon_5_discount_date_conv
>> group by
>> album_id, date;
>>
>>
>> when sink to mysql, the following SQL is executed: insert into xxx (c1,
>> c2,c3) values (?,?,?) on duplicate key update c1=VALUES(c1),c2=VALUES(c2
>> ), c3=VALUES(c3)
>> The engine is InnoDB, column c1,c2 is unique key, the isolation
>> level is READ COMMITTED. But in the log a deadlock exception happens.
>> As I know, because the unique key exists, only the line lock will be
>> applied, no gap lock will be applied. And due to a group by sentence, the
>> same unique key should be written by the same thread. So in this case,
>> why the dead lock should happened? Could anyone help me? Thanks a lot.
>>
>> Best
>> Henry
>>
>


Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread Stefan Richter
Hi,

are your RocksDB instances running on local SSDs or on something like EBS? If 
have previously seen cases where this happened because some EBS quota was 
exhausted and the performance got throttled.

Best,
Stefan

> On 22. Nov 2018, at 09:51, liujiangang  wrote:
> 
> Thank you very much. I have something to say. Each data is 20KB. The
> parallelism is 500 and each taskmanager memory is 10G. The memory is enough,
> and I think the parallelism is big enough. Only the intervalJoin thread is
> beyond 100% because of rockdb's seek. I am confused that why rockdb's seek
> taks so long time but get no result. I don't kow how to debug rocksdb in
> flink.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread liujiangang
Thank you very much. I have something to say. Each data is 20KB. The
parallelism is 500 and each taskmanager memory is 10G. The memory is enough,
and I think the parallelism is big enough. Only the intervalJoin thread is
beyond 100% because of rockdb's seek. I am confused that why rockdb's seek
taks so long time but get no result. I don't kow how to debug rocksdb in
flink.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


elasticsearch sink can't connect to elastic cluster with BasicAuth

2018-11-22 Thread hzyuemeng1






after i install x-pack in my elasticsearch cluster and the elasticsearch cluster with basicauth
the elasticsearch sink can't connect to elastic clustercode like:DataStream> esSink27 = tableEnv13.toRetractStream(esTable26, Row.class).filter( tuple -> tuple.f0);//generate user config mapjava.util.Map userConfigMap22 = com.google.common.collect.Maps.newHashMap();userConfigMap22.put("cluster.name", "test-magina");userConfigMap22.put("bulk.flush.max.actions", "1");//userConfigMap22.put("shield.user", "elastic:magina1001password");//generate transports listSplitter commaSplitter24 = Splitter.on(",");Splitter colonSplitter25 = Splitter.on(":");List transportsList23 = Lists.newArrayList();for (String transport : commaSplitter24.split("101.206.91.118:9300")) {    List ipAndPort = colonSplitter25.splitToList(transport);    transportsList23.add(new InetSocketAddress(InetAddress.getByName(ipAndPort.get(0)), Integer.parseInt(ipAndPort.get(1;}esSink27.addSink(new ElasticsearchSink>(userConfigMap22, transportsList23, new MaginaES5SinkFunction(esTable26.getSchema().getColumnNames(), "userid", "test-au", "test-au", "action,num"), new RetryRejectedExecutionFailureHandler())).name("elasticsearch_4068").setParallelism(2);Any help will be greatly appreciated






 










hzyuemeng1







hzyueme...@corp.netease.com








签名由
网易邮箱大师
定制