Re: HashMap/HashSet Serialization Issue

2017-01-06 Thread Chen Qin
My understanding is HashMap doesn't work with Flink Native serialization
framework, though I might be wrong.

This might worth reading
​
https://cwiki.apache.org/confluence/display/FLINK/Type+System,+Type+Extraction,+Serialization

-Chen​

On Fri, Jan 6, 2017 at 6:06 PM, Charith Wickramarachchi <
charith.dhanus...@gmail.com> wrote:

> Hi All,
>
> I am using flink-gelly and using a custom POJO type as the
> VertexValue/MessageType (I am using the vertex-centric model). The POJO
> contains HashMap/HashSet as members. While executing the job I am getting
> following Log message.
>
> 17:50:53,582 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
> - No fields detected for class java.util.HashSet. Cannot be used as
> a PojoType. Will be handled as GenericType
> 17:50:53,583 INFO  org.apache.flink.api.java.typeutils.
> ​​
> TypeExtractor - class java.util.HashMap is not a valid POJO
> type
>
> Is there a way to resolve this issue?
>
> Thanks,
> Charith
>
>
> --
> Charith Dhanushka Wickramaarachchi
>
> Tel  +1 213 447 4253
> Blog  http://charith.wickramaarachchi.org/
> 
> Twitter  @charithwiki 
>
> This communication may contain privileged or other confidential information
> and is intended exclusively for the addressee/s. If you are not the
> intended recipient/s, or believe that you may have
> received this communication in error, please reply to the sender indicating
> that fact and delete the copy you received and in addition, you should
> not print, copy, retransmit, disseminate, or otherwise use the
> information contained in this communication. Internet communications
> cannot be guaranteed to be timely, secure, error or virus-free. The
> sender does not accept liability for any errors or omissions
>


HashMap/HashSet Serialization Issue

2017-01-06 Thread Charith Wickramarachchi
Hi All,

I am using flink-gelly and using a custom POJO type as the
VertexValue/MessageType (I am using the vertex-centric model). The POJO
contains HashMap/HashSet as members. While executing the job I am getting
following Log message.

17:50:53,582 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
  - No fields detected for class java.util.HashSet. Cannot be used as a
PojoType. Will be handled as GenericType
17:50:53,583 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
  - class java.util.HashMap is not a valid POJO type

Is there a way to resolve this issue?

Thanks,
Charith


-- 
Charith Dhanushka Wickramaarachchi

Tel  +1 213 447 4253
Blog  http://charith.wickramaarachchi.org/

Twitter  @charithwiki 

This communication may contain privileged or other confidential information
and is intended exclusively for the addressee/s. If you are not the
intended recipient/s, or believe that you may have
received this communication in error, please reply to the sender indicating
that fact and delete the copy you received and in addition, you should not
print, copy, retransmit, disseminate, or otherwise use the information
contained in this communication. Internet communications cannot be
guaranteed to be timely, secure, error or virus-free. The sender does not
accept liability for any errors or omissions


Re: Increasing parallelism skews/increases overall job processing time linearly

2017-01-06 Thread Stephan Ewen
Hi!

You are right, parallelism 2 should be faster than parallelism 1 ;-) As
ChenQin pointed out, having only 2 Kafka Partitions may prevent further
scaleout.

Few things to check:
  - How are you connecting the FlatMap and CoFlatMap? Default, keyBy,
broadcast?
  - Broadcast for example would multiply the data based on parallelism, can
lead to slowdown when saturating the network.
  - Are you using the standard Kafka Source (which Kafka version)?
  - Is there any part in the program that multiplies data/effort with
higher parallelism (does the FlatMap explode data based on parallelism)?

Stephan


On Fri, Jan 6, 2017 at 7:27 PM, Chen Qin  wrote:

> Just noticed there are only two partitions per topic. Regardless of how
> large parallelism set. Only two of those will get partition assigned at
> most.
>
> Sent from my iPhone
>
> On Jan 6, 2017, at 02:40, Chakravarthy varaga 
> wrote:
>
> Hi All,
>
> Any updates on this?
>
> Best Regards
> CVP
>
> On Thu, Jan 5, 2017 at 1:21 PM, Chakravarthy varaga <
> chakravarth...@gmail.com> wrote:
>
>>
>> Hi All,
>>
>> I have a job as attached.
>>
>> I have a 16 Core blade running RHEL 7. The taskmanager default number of
>> slots is set to 1. The source is a kafka stream and each of the 2
>> sources(topic) have 2 partitions each.
>>
>>
>> *What I notice is that when I deploy a job to run with #parallelism=2 the
>> total processing time doubles the time it took when the same job was
>> deployed with #parallelism=1. It linearly increases with the parallelism.*
>> Since the numberof slots is set to 1 per TM, I would assume that the job
>> would be processed in parallel in 2 different TMs and that each consumer in
>> each TM is connected to 1 partition of the topic. This therefore should
>> have kept the overall processing time the same or less !!!
>>
>> The co-flatmap connects the 2 streams & uses ValueState (checkpointed in
>> FS). I think this is distributed among the TMs. My understanding is that
>> the search of values state could be costly between TMs.  Do you sense
>> something wrong here?
>>
>> Best Regards
>> CVP
>>
>>
>>
>>
>>
>


Re: Increasing parallelism skews/increases overall job processing time linearly

2017-01-06 Thread Chen Qin
Just noticed there are only two partitions per topic. Regardless of how large 
parallelism set. Only two of those will get partition assigned at most.

Sent from my iPhone

> On Jan 6, 2017, at 02:40, Chakravarthy varaga  
> wrote:
> 
> Hi All,
> 
> Any updates on this?
> 
> Best Regards
> CVP
> 
>> On Thu, Jan 5, 2017 at 1:21 PM, Chakravarthy varaga 
>>  wrote:
>> 
>> Hi All,
>> 
>> I have a job as attached.
>> 
>> I have a 16 Core blade running RHEL 7. The taskmanager default number of 
>> slots is set to 1. The source is a kafka stream and each of the 2 
>> sources(topic) have 2 partitions each.
>> What I notice is that when I deploy a job to run with #parallelism=2 the 
>> total processing time doubles the time it took when the same job was 
>> deployed with #parallelism=1. It linearly increases with the parallelism.
>> 
>> Since the numberof slots is set to 1 per TM, I would assume that the job 
>> would be processed in parallel in 2 different TMs and that each consumer in 
>> each TM is connected to 1 partition of the topic. This therefore should have 
>> kept the overall processing time the same or less !!!
>> 
>> The co-flatmap connects the 2 streams & uses ValueState (checkpointed in 
>> FS). I think this is distributed among the TMs. My understanding is that the 
>> search of values state could be costly between TMs.  Do you sense something 
>> wrong here?
>> 
>> Best Regards
>> CVP
>> 
>> 
>> 
>> 
> 


Re: failure-rate restart strategy not working?

2017-01-06 Thread Shannon Carey
I think I figured it out: the problem is due to Flink's behavior when a job has 
checkpointing enabled.

When the job graph is created, if checkpointing is enabled but a restart 
strategy hasn't been programmatically configured, Flink changes the job graph's 
execution config to use the fixed delay restart strategy. That gets serialized 
with the job graph. Then, when the JobManager deserializes the execution 
config, it sees that there's a restart strategy configured for the job and uses 
that instead of using the restart strategy that's configured on the cluster.

Clearly, the documentation definitely needs to be adjusted. Maybe I can add 
some changes to https://github.com/apache/flink/pull/3059

However, should we also consider some implementation changes? Is it intentional 
that enabling checkpoint overrides the restart strategy set on the cluster, and 
that the only way to control the restart strategy on a checkpointed job is to 
set it programmatically? If not, then would it be reasonable to only set 
fixed-delay restart strategy if checkpointing is enabled AND the cluster 
doesn't explicitly configure it? Flink would no longer be use the execution 
config to control the strategy, but would instead do it in the 
JobManager#submitJob().

-Shannon

From: Shannon Carey >
Date: Thursday, January 5, 2017 at 1:50 PM
To: "user@flink.apache.org" 
>
Subject: failure-rate restart strategy not working?

I recently updated my cluster with the following config:

restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

I see the settings inside the JobManager web UI, as expected. I am not setting 
the restart-strategy programmatically, but the job does have checkpointing 
enabled.

However, if I launch a job that (intentionally) fails every 10 seconds by 
throwing a RuntimeException, it continues to restart beyond the limit of 3 
failures.

Does anyone know why this might be happening? Any ideas of things I could check?

Thanks!
Shannon


Re: Increasing parallelism skews/increases overall job processing time linearly

2017-01-06 Thread Chakravarthy varaga
Hi All,

Any updates on this?

Best Regards
CVP

On Thu, Jan 5, 2017 at 1:21 PM, Chakravarthy varaga <
chakravarth...@gmail.com> wrote:

>
> Hi All,
>
> I have a job as attached.
>
> I have a 16 Core blade running RHEL 7. The taskmanager default number of
> slots is set to 1. The source is a kafka stream and each of the 2
> sources(topic) have 2 partitions each.
>
>
> *What I notice is that when I deploy a job to run with #parallelism=2 the
> total processing time doubles the time it took when the same job was
> deployed with #parallelism=1. It linearly increases with the parallelism.*
> Since the numberof slots is set to 1 per TM, I would assume that the job
> would be processed in parallel in 2 different TMs and that each consumer in
> each TM is connected to 1 partition of the topic. This therefore should
> have kept the overall processing time the same or less !!!
>
> The co-flatmap connects the 2 streams & uses ValueState (checkpointed in
> FS). I think this is distributed among the TMs. My understanding is that
> the search of values state could be costly between TMs.  Do you sense
> something wrong here?
>
> Best Regards
> CVP
>
>
>
>
>


Re: Does Flink cluster security works in Flink 1.1.4 release?

2017-01-06 Thread Stephan Ewen
I think you can also use Kerberos in the standalone mode in 1.1.x, but is
is more tricky - you need do a "kinit" on every host where you launch a
Flink process.

Flink 1.2 has better Kerberos support.

On Fri, Jan 6, 2017 at 4:19 AM, Zhangrucong  wrote:

> Hi Stephan:
>
>   Thanks for your reply.
>
> You mean the CLI、JM、TM、WebUI  have supported Kerberos authentication only
> in yarn cluster model in 1.1.x release?
>
>
>
> *发件人:* Stephan Ewen [mailto:se...@apache.org ]
> *发送时间:* 2017年1月4日 17:23
> *收件人:* user@flink.apache.org
> *主题:* Re: Does Flink cluster security works in Flink 1.1.4 release?
>
>
>
> Hi!
>
>
>
> Flink 1.1.x supports Kerberos for Hadoop (HDFS, YARN, HBase) via Hadoop's
> ticket system. It should work via kinit, in the same way when submitting a
> secure MapReduce job.
>
>
>
> Kerberos for ZooKeeper, Kafka, etc, is only part of the 1.2 release.
>
>
>
> Greetings,
>
> Stephan
>
>
>
>
>
> On Wed, Jan 4, 2017 at 7:25 AM, Zhangrucong 
> wrote:
>
> Hi:
>
>  Now I use Flink 1.1.4 release in standalone cluster model. I want to
> do the Kerberos authentication between Flink CLI and the Jobmanager. But in
> the flink-conf.yaml, there is no Flink cluster security configuration.
>
> Does the Kerberos authentication works in Flink 1.1.4 release?
>
> Thanks in advance!
>
>
>
>
>


Re: Som question about Flink stream sql

2017-01-06 Thread Jark Wu
Hi Yuhong, 

The design doc is not ready. You can refer FLINK-4679. There are some 
discussions under the issue.

- Jark Wu 

> 在 2017年1月6日,下午3:08,Hongyuhong  写道:
> 
> Thanks Jark
>  
> Do you have any design documents or instructions about row window? We are 
> interested in adding supports of window in streamsql side and it’s greate if 
> can have some reference.
> Thanks very much.
>  
> Best
> Yuhong
>  
>  
> 发件人: Jark Wu [mailto:wuchong...@alibaba-inc.com] 
> 发送时间: 2017年1月6日 11:01
> 收件人: user@flink.apache.org
> 主题: Re: Som question about Flink stream sql
>  
> Hi Yuhong, 
>  
> I have assigned an issue for tumble row-window, but the progress is still 
> under design and discuss. The implementation of row window is more complex 
> than group window.
> I will push forward this issue in the next days.
>  
> - Jark Wu 
>  
> 在 2017年1月5日,下午7:00,Hongyuhong  > 写道:
>  
> Hi Fabian,
>  
> Thanks for the reply.
> As you noticed, row windows are already supported by Calcite and FLIP-11 has 
> planned,
> Can you tell something about the progress of the row windows in Table API?
>  
> Regards.
> Yuhong
>  
>  
>  
>  
> 发件人: Fabian Hueske [mailto:fhue...@gmail.com ] 
> 发送时间: 2017年1月5日 17:43
> 收件人: user@flink.apache.org 
> 主题: Re: Som question about Flink stream sql
>  
> Hi Yuhong,
> 
> as you noticed, FLIP-11 is about the window operations on the Table API and 
> does not include SQL.
> The reason is that the Table API is completely Flink domain, i.e., we can 
> design and implement the API. For SQL we have a dependency on Calcite.
> 
> You are right, that Calcite's JIRA issue for group windows [1] seems to be 
> stale. I don't have more information on that. You could ping there and ask 
> what the status is.
> Row windows are already supported by Calcite (SQL's OVER and WINDOW clauses), 
> so these could be implemented for SQL as well.
> Maybe it's even easier to start from the SQL side and add the Table API later 
> to ensure a compatible compilation process.
> 
> Best, Fabian
>  
> 
>  
> 2017-01-05 4:40 GMT+01:00 Hongyuhong  >:
> Hi,
> We are currently exploring on Flink streamsql ,
> And I see the group-window has been implemented in Table API, and row-window 
> is also planning in FLIP-11. It seems that row-window grammar is more similar 
> to calcite over clause.
> I’m curious about the detail plan and roadmap of stream sql, cause FLIP-11 
> just mentioned Table API.
> And is that streamsql priority implement row-window? Or if group-window is 
> considered, what is the status on calcite integration?
> The issue on calcite jira was raised in August, what’s the status right now?
> Thanks in advance!
>  
> Regards
> Yuhong