Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-10 Thread Jörn Franke
Would it maybe make sense to provide Flink as an engine on Hive 
(„flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
coupled than integrating hive in all possible flink core modules and thus 
introducing a very tight dependency to Hive in the core.
1,2,3 could be achieved via a connector based on the Flink Table API.
Just as a proposal to start this Endeavour as independent projects (hive 
engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
distant future if the Hive integration is heavily demanded one could then 
integrate it more tightly if needed. 

What is meant by 11?
> Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :
> 
> Hi Fabian/Vno,
> 
> Thank you very much for your encouragement inquiry. Sorry that I didn't see 
> Fabian's email until I read Vino's response just now. (Somehow Fabian's went 
> to the spam folder.)
> 
> My proposal contains long-term and short-terms goals. Nevertheless, the 
> effort will focus on the following areas, including Fabian's list:
> 
> 1. Hive metastore connectivity - This covers both read/write access, which 
> means Flink can make full use of Hive's metastore as its catalog (at least 
> for the batch but can extend for streaming as well).
> 2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
> created by Hive can be understood by Flink and the reverse direction is true 
> also.
> 3. Data compatibility - Similar to #2, data produced by Hive can be consumed 
> by Flink and vise versa.
> 4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
> own implementation or make Hive's implementation work in Flink. Further, for 
> user created UDFs in Hive, Flink SQL should provide a mechanism allowing user 
> to import them into Flink without any code change required.
> 5. Data types -  Flink SQL should support all data types that are available 
> in Hive.
> 6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) 
> with extension to support Hive's syntax and language features, around DDL, 
> DML, and SELECT queries.
> 7.  SQL CLI - this is currently developing in Flink but more effort is needed.
> 8. Server - provide a server that's compatible with Hive's HiverServer2 in 
> thrift APIs, such that HiveServer2 users can reuse their existing client 
> (such as beeline) but connect to Flink's thrift server instead.
> 9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other 
> application to use to connect to its thrift server
> 10. Support other user's customizations in Hive, such as Hive Serdes, storage 
> handlers, etc.
> 11. Better task failure tolerance and task scheduling at Flink runtime.
> 
> As you can see, achieving all those requires significant effort and across 
> all layers in Flink. However, a short-term goal could  include only core 
> areas (such as 1, 2, 4, 5, 6, 7) or start  at a smaller scope (such as #3, 
> #6).
> 
> Please share your further thoughts. If we generally agree that this is the 
> right direction, I could come up with a formal proposal quickly and then we 
> can follow up with broader discussions.
> 
> Thanks,
> Xuefu
> 
> 
> 
> --
> Sender:vino yang 
> Sent at:2018 Oct 11 (Thu) 09:45
> Recipient:Fabian Hueske 
> Cc:dev ; Xuefu ; user 
> 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
> 
> Hi Xuefu,
> 
> Appreciate this proposal, and like Fabian, it would look better if you can 
> give more details of the plan.
> 
> Thanks, vino.
> 
> Fabian Hueske  于2018年10月10日周三 下午5:27写道:
> Hi Xuefu,
> 
> Welcome to the Flink community and thanks for starting this discussion! 
> Better Hive integration would be really great!
> Can you go into details of what you are proposing? I can think of a couple 
> ways to improve Flink in that regard:
> 
> * Support for Hive UDFs
> * Support for Hive metadata catalog
> * Support for HiveQL syntax
> * ???
> 
> Best, Fabian
> 
> Am Di., 9. Okt. 2018 um 19:22 Uhr schrieb Zhang, Xuefu 
> :
> Hi all,
> 
> Along with the community's effort, inside Alibaba we have explored Flink's 
> potential as an execution engine not just for stream processing but also for 
> batch processing. We are encouraged by our findings and have initiated our 
> effort to make Flink's SQL capabilities full-fledged. When comparing what's 
> available in Flink to the offerings from competitive data processing engines, 
> we identified a major gap in Flink: a well integration with Hive ecosystem. 
> This is crucial to the success of Flink SQL and batch due to the 
> well-established data ecosystem around Hive. Therefore, we have done some 
> initial work along this direction but there are still a lot of effort needed.
> 
> We have two strategies in mind. The first one is to make Flink SQL 
> full-fledged and well-integrated with Hive ecosystem. This is a similar 
> approach to what Spark SQL adopted. The second strategy is to make Hive 
> itself 

Issue while running integration test using AbstractTestBase

2018-10-10 Thread James Isaac
Hi,

I'm trying to run an integration test of my flink application. My test code
looks like this:

public class HttpsCsvIngestorTest extends AbstractTestBase {

private final static Logger LOG =
LoggerFactory.getLogger(HttpsCsvIngestorTest.class);

@Test
public void testHttpsCsvIngestion() throws Exception {

Thread flinkJob = new Thread(new Runnable() {
@Override
public void run() {
String[] args = new String[] { "--configFile",
"src/test/resources/config.properties", "--secretKey",
"12345" };
JobExecutionResult execResult =
CsvProcessorFlinkDriver.runFlinkJob(args);
}
});

flinkJob.start();
LOG.info("Starting flink job");

Thread.sleep(1);
String[] args2 = new String[] { "localhost", filename };
FileUploadClient.main(args2);

assertTrue(new File(System.getProperty("user.dir") + File.separator
+ "src/main/resources/Result.csv")
.exists());
System.out.println("Test completed. Going to shutdown flink job");
}

}


Here I'm starting my flink application from a child thread, and uploading a
file for processing from the main thread. The test runs fine, and I get the
expected result file.
However, I get the following error at the end, when the application is
being shut down:

2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map ->
Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error
during disposal of stream operator.
java.lang.NoSuchMethodError:
org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map ->
Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error
during disposal of stream operator.
java.lang.NoSuchMethodError:
org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map ->
Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error
during disposal of stream operator.
java.lang.NoSuchMethodError:
org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,671 ERROR Source: JettyServerFileSource -> Map ->
Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error
during disposal of stream operator.
java.lang.NoSuchMethodError:
org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,671 ERROR Source: JettyServerFileSource -> Map 

答复: No data issued by flink window after a few hours

2018-10-10 Thread 潘 功森
Hi,
I changed as below configurations,and it looks fine when job started.
But there’re no results issued when window ends after running about six hours, 
and no errors and exceptions.
How can I position the question?

Yours,
September


发件人: 潘 功森 
发送时间: Wednesday, October 10, 2018 2:44:48 PM
收件人: vino yang
抄送: user
主题: 答复: No data issued by flink window after a few hours

Hi,

Cause default state size in one hour is too small,and the max window size is 24 
hours, so I used 500M.


MemoryStateBackend stateBackend = new MemoryStateBackend(MAX_STATE_SIZE);//500M
env.setStateBackend(stateBackend);

And I found Irrespective of the configured maximal state size, the state cannot 
be larger than the akka frame size.
So I add a config in flink-comf.yaml:
akka.framesize: 524288000b

What else do I have to pay attention to?

Yours,
September


发件人: vino yang 
发送时间: Wednesday, October 10, 2018 11:45:31 AM
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

I saw the exception image you provided. Based on the exception message, it 
seems you used the default max state size (5MB).

You can specify the max state size to override the default value. Try :

MemoryStateBackend stateBackend = new MemoryStateBackend(theSizeOfBytes);

Please note that you need to reserve enough memory for Flink.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午11:36写道:
Please have a look about my last mail.

When the cached window data is too large, how?

Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 11:33:48 AM
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

Did you mean "computer momery" referring to Memory Statebackend?
The Flink window mechanism is internally based on State, and this is done for 
fault tolerance.
If you introduce external storage, it will break its design and bring other 
problems.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午11:02写道:
Hi,
"ram to cache the distinct data about sliding window" means I used computer 
momery not the third part db to cache the data need used in window.
“the data need used in window” means :such as the sliding window is 1 hour, and 
I need to count the distinct users, I need to cache the user id about one hour.
Cause there’re no related errors.
Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 10:49:43 AM
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

Can you explain what "ram to cache the distinct data about sliding window" mean?
The information you provide is too small and will not help others to help you 
analyze the problem and provide advice.

In addition, regarding the usage of Flink related issues, please only send mail 
to the user mailing list.
The dev mailing list is mainly used to discuss development related issues.

Thanks vino.

? ?? mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午10:37写道:
Hi all,
I used flink window, and when the job begins, we could get the results of 
windiow.But there’re no results issued after a few hours.
I found the job is still running and no errors, and the data not used 
window all can be issued.
By the way, I used Flink 1.3.2 and ram to cache the distinct data about 
sliding window.

Yours,
September


Re: Getting NoMethod found error while running job on flink 1.6.1

2018-10-10 Thread vino yang
Hi Chandu,

What mode does your Flink run in?
In addition, can you check if the flink-metrics-core is included in the
classpath of the Flink runtime environment?

Thanks, vino.

Chandu Kempaiah  于2018年10月11日周四 上午9:51写道:

>
> Hello,
>
> I am have a job that reads messages from kafka, processes them and writes
> back to kafka, this jobs works fine on flink 1.3.2. I upgraded cluster to
> 1.6.1 but now see below error. Has any one faced similar issue?
>
> I have updated all the dependencies to use
>
> 1.6.1
>
> 
> org.apache.flink
> flink-connector-kafka-0.10_${scala.version}
> ${flink.version}
> 
>
>
> java.lang.NoSuchMethodError: 
> org.apache.flink.metrics.MetricGroup.addGroup(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/flink/metrics/MetricGroup;
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.registerOffsetMetrics(AbstractFetcher.java:622)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:200)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:91)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:64)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:209)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:647)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
>
>
>
> Thanks
>
> Chandu
>
>


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-10 Thread Zhang, Xuefu
Hi Fabian/Vno,

Thank you very much for your encouragement inquiry. Sorry that I didn't see 
Fabian's email until I read Vino's response just now. (Somehow Fabian's went to 
the spam folder.)

My proposal contains long-term and short-terms goals. Nevertheless, the effort 
will focus on the following areas, including Fabian's list:

1. Hive metastore connectivity - This covers both read/write access, which 
means Flink can make full use of Hive's metastore as its catalog (at least for 
the batch but can extend for streaming as well).
2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
created by Hive can be understood by Flink and the reverse direction is true 
also.
3. Data compatibility - Similar to #2, data produced by Hive can be consumed by 
Flink and vise versa.
4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
own implementation or make Hive's implementation work in Flink. Further, for 
user created UDFs in Hive, Flink SQL should provide a mechanism allowing user 
to import them into Flink without any code change required.
5. Data types -  Flink SQL should support all data types that are available in 
Hive.
6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) with 
extension to support Hive's syntax and language features, around DDL, DML, and 
SELECT queries.
7.  SQL CLI - this is currently developing in Flink but more effort is needed.
8. Server - provide a server that's compatible with Hive's HiverServer2 in 
thrift APIs, such that HiveServer2 users can reuse their existing client (such 
as beeline) but connect to Flink's thrift server instead.
9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other 
application to use to connect to its thrift server
10. Support other user's customizations in Hive, such as Hive Serdes, storage 
handlers, etc.
11. Better task failure tolerance and task scheduling at Flink runtime.

As you can see, achieving all those requires significant effort and across all 
layers in Flink. However, a short-term goal could  include only core areas 
(such as 1, 2, 4, 5, 6, 7) or start  at a smaller scope (such as #3, #6).

Please share your further thoughts. If we generally agree that this is the 
right direction, I could come up with a formal proposal quickly and then we can 
follow up with broader discussions.

Thanks,
Xuefu




--
Sender:vino yang 
Sent at:2018 Oct 11 (Thu) 09:45
Recipient:Fabian Hueske 
Cc:dev ; Xuefu ; user 

Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi Xuefu,

Appreciate this proposal, and like Fabian, it would look better if you can give 
more details of the plan.

Thanks, vino.
Fabian Hueske  于2018年10月10日周三 下午5:27写道:

Hi Xuefu,

Welcome to the Flink community and thanks for starting this discussion! Better 
Hive integration would be really great!
Can you go into details of what you are proposing? I can think of a couple ways 
to improve Flink in that regard:

* Support for Hive UDFs
* Support for Hive metadata catalog
* Support for HiveQL syntax
* ???

Best, Fabian

Am Di., 9. Okt. 2018 um 19:22 Uhr schrieb Zhang, Xuefu 
:
Hi all,

 Along with the community's effort, inside Alibaba we have explored Flink's 
potential as an execution engine not just for stream processing but also for 
batch processing. We are encouraged by our findings and have initiated our 
effort to make Flink's SQL capabilities full-fledged. When comparing what's 
available in Flink to the offerings from competitive data processing engines, 
we identified a major gap in Flink: a well integration with Hive ecosystem. 
This is crucial to the success of Flink SQL and batch due to the 
well-established data ecosystem around Hive. Therefore, we have done some 
initial work along this direction but there are still a lot of effort needed.

 We have two strategies in mind. The first one is to make Flink SQL 
full-fledged and well-integrated with Hive ecosystem. This is a similar 
approach to what Spark SQL adopted. The second strategy is to make Hive itself 
work with Flink, similar to the proposal in [1]. Each approach bears its pros 
and cons, but they don’t need to be mutually exclusive with each targeting at 
different users and use cases. We believe that both will promote a much greater 
adoption of Flink beyond stream processing.

 We have been focused on the first approach and would like to showcase Flink's 
batch and SQL capabilities with Flink SQL. However, we have also planned to 
start strategy #2 as the follow-up effort.

 I'm completely new to Flink(, with a short bio [2] below), though many of my 
colleagues here at Alibaba are long-time contributors. Nevertheless, I'd like 
to share our thoughts and invite your early feedback. At the same time, I am 
working on a detailed proposal on Flink SQL's integration with Hive ecosystem, 
which will be also shared when ready.

 While the ideas are simple, each 

Re: Partitions vs. Subpartitions

2018-10-10 Thread Kurt Young
Hi,

Partition is the output of a JobVertex which you can simply thought
contains an operator. And in real world, JobVertex will run in parallel,
each
will output some data, which is conceptually called subpartition.

Best,
Kurt


On Thu, Oct 11, 2018 at 10:27 AM Renjie Liu  wrote:

> Hi, Chris:
>
> Where are these words from? Are they from flink source code?
>
> On Wed, Oct 10, 2018 at 10:18 PM Chris Miller  wrote:
>
>> Hi,
>>
>> what's the difference between partitions and subpartitions?
>>
>>
>>
>> Thanks.
>>
>>
>>
>> CM
>>
>>
>
>
> --
> Renjie Liu
> Software Engineer, MVAD
>


Re: Partitions vs. Subpartitions

2018-10-10 Thread Renjie Liu
Hi, Chris:

Where are these words from? Are they from flink source code?

On Wed, Oct 10, 2018 at 10:18 PM Chris Miller  wrote:

> Hi,
>
> what's the difference between partitions and subpartitions?
>
>
>
> Thanks.
>
>
>
> CM
>
>


-- 
Renjie Liu
Software Engineer, MVAD


Fwd: Getting NoMethod found error while running job on flink 1.6.1

2018-10-10 Thread Chandu Kempaiah
Hello,

I am have a job that reads messages from kafka, processes them and writes
back to kafka, this jobs works fine on flink 1.3.2. I upgraded cluster to
1.6.1 but now see below error. Has any one faced similar issue?

I have updated all the dependencies to use

1.6.1


org.apache.flink
flink-connector-kafka-0.10_${scala.version}
${flink.version}



java.lang.NoSuchMethodError:
org.apache.flink.metrics.MetricGroup.addGroup(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/flink/metrics/MetricGroup;
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.registerOffsetMetrics(AbstractFetcher.java:622)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:200)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:91)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:64)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:209)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:647)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)



Thanks

Chandu


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-10 Thread vino yang
Hi Xuefu,

Appreciate this proposal, and like Fabian, it would look better if you can
give more details of the plan.

Thanks, vino.

Fabian Hueske  于2018年10月10日周三 下午5:27写道:

> Hi Xuefu,
>
> Welcome to the Flink community and thanks for starting this discussion!
> Better Hive integration would be really great!
> Can you go into details of what you are proposing? I can think of a couple
> ways to improve Flink in that regard:
>
> * Support for Hive UDFs
> * Support for Hive metadata catalog
> * Support for HiveQL syntax
> * ???
>
> Best, Fabian
>
> Am Di., 9. Okt. 2018 um 19:22 Uhr schrieb Zhang, Xuefu <
> xuef...@alibaba-inc.com>:
>
>> Hi all,
>>
>> Along with the community's effort, inside Alibaba we have explored
>> Flink's potential as an execution engine not just for stream processing but
>> also for batch processing. We are encouraged by our findings and have
>> initiated our effort to make Flink's SQL capabilities full-fledged. When
>> comparing what's available in Flink to the offerings from competitive data
>> processing engines, we identified a major gap in Flink: a well integration
>> with Hive ecosystem. This is crucial to the success of Flink SQL and batch
>> due to the well-established data ecosystem around Hive. Therefore, we have
>> done some initial work along this direction but there are still a lot of
>> effort needed.
>>
>> We have two strategies in mind. The first one is to make Flink SQL
>> full-fledged and well-integrated with Hive ecosystem. This is a similar
>> approach to what Spark SQL adopted. The second strategy is to make Hive
>> itself work with Flink, similar to the proposal in [1]. Each approach bears
>> its pros and cons, but they don’t need to be mutually exclusive with each
>> targeting at different users and use cases. We believe that both will
>> promote a much greater adoption of Flink beyond stream processing.
>>
>> We have been focused on the first approach and would like to showcase
>> Flink's batch and SQL capabilities with Flink SQL. However, we have also
>> planned to start strategy #2 as the follow-up effort.
>>
>> I'm completely new to Flink(, with a short bio [2] below), though many of
>> my colleagues here at Alibaba are long-time contributors. Nevertheless, I'd
>> like to share our thoughts and invite your early feedback. At the same
>> time, I am working on a detailed proposal on Flink SQL's integration with
>> Hive ecosystem, which will be also shared when ready.
>>
>> While the ideas are simple, each approach will demand significant effort,
>> more than what we can afford. Thus, the input and contributions from the
>> communities are greatly welcome and appreciated.
>>
>> Regards,
>>
>>
>> Xuefu
>>
>> References:
>>
>> [1] https://issues.apache.org/jira/browse/HIVE-10712
>> [2] Xuefu Zhang is a long-time open source veteran, worked or working on
>> many projects under Apache Foundation, of which he is also an honored
>> member. About 10 years ago he worked in the Hadoop team at Yahoo where the
>> projects just got started. Later he worked at Cloudera, initiating and
>> leading the development of Hive on Spark project in the communities and
>> across many organizations. Prior to joining Alibaba, he worked at Uber
>> where he promoted Hive on Spark to all Uber's SQL on Hadoop workload and
>> significantly improved Uber's cluster efficiency.
>>
>>
>>


Taskmanager times out continuously for registration with Jobmanager

2018-10-10 Thread Abdul Qadeer
Hi,


We are facing an issue in standalone HA mode in Flink 1.4.0 where
Taskmanager restarts and is not able to register with the Jobmanager. It
times out awaiting *AcknowledgeRegistration/AlreadyRegistered* message from
Jobmanager Actor and keeps sending *RegisterTaskManager *message. The logs
at Jobmanager don’t show anything about registration failure/request. It
doesn’t print *log*.debug(*s"RegisterTaskManager: $*msg*"*) (from
JobManager.scala) either. The network connection between taskmanager and
jobmanager seems fine; tcpdump shows message sent to jobmanager and TCP ACK
received from jobmanager. Note that the communication is happening between
docker containers.


Following are the logs from Taskmanager:



{"timeMillis":1539189572438,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
to register at JobManager akka.tcp://
flink@192.168.83.51:6123/user/jobmanager (attempt 1400, timeout: 3
milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}

{"timeMillis":1539189580229,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
ping response for sessionid: 0x1260ea5002d after
0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}

{"timeMillis":1539189600247,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
ping response for sessionid: 0x1260ea5002d after
0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}

{"timeMillis":1539189602458,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
to register at JobManager akka.tcp://
flink@192.168.83.51:6123/user/jobmanager (attempt 1401, timeout: 3
milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}

{"timeMillis":1539189620251,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
ping response for sessionid: 0x1260ea5002d after
0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}

{"timeMillis":1539189632478,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
to register at JobManager akka.tcp://
flink@192.168.83.51:6123/user/jobmanager (attempt 1402, timeout: 3
milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}


Re: Watermark on keyed stream

2018-10-10 Thread Elias Levy
You are correct that watermarks are not tracked per key.  You are dealing
with events with a high degree of delay variability.  That is usually not a
good match for event time processing as implemented in Flink.

You could use event time processing and configure a very large window
allowed lateness (days in your case), but that would significantly increase
the amount of state you must track.  That may be acceptable depending on
your message volume, scale of deployment, and state and timer storage
backend (RocksDB).


On Wed, Oct 10, 2018 at 11:12 AM Nick Triller 
wrote:

> Hi everyone,
>
>
>
> it seems Flink only supports global watermarks currently which is a
> problem for my use case.
>
> Many sensors send data which might be buffered for days in upstream
> systems before arriving at the Flink job.
>
> The job keys the stream by sensor. If other sensors send values in the
> meantime, the global watermark is advanced
>
> and buffered data that arrives late is dropped.
>
>
>
> How could the issue be solved? I guess it would be possible to calculate
> the watermark manually and add it to a wrapper object,
>
> but I am not sure how to correctly implement windowing (tumbling window)
> then.
>
>
>
> Thank you in advance for any ideas.
>
>
>
> Regards,
>
> Nick
>


Watermark on keyed stream

2018-10-10 Thread Nick Triller
Hi everyone,

it seems Flink only supports global watermarks currently which is a problem for 
my use case.
Many sensors send data which might be buffered for days in upstream systems 
before arriving at the Flink job.
The job keys the stream by sensor. If other sensors send values in the 
meantime, the global watermark is advanced
and buffered data that arrives late is dropped.

How could the issue be solved? I guess it would be possible to calculate the 
watermark manually and add it to a wrapper object,
but I am not sure how to correctly implement windowing (tumbling window) then.

Thank you in advance for any ideas.

Regards,
Nick


Re: Flink 1.5.2 - excessive ammount of container requests, Received new/Returning excess container "flood"

2018-10-10 Thread Gary Yao
Hi Borys,

I remember that another user reported a similar issue recently [1] –
attached
to the ticket you can find his log file. If I recall correctly, we concluded
that YARN returned the containers very quickly. At the time, Flink's debug
level logs were inconclusive because we did not log the reason why the
container was returned, and the user could not provide us the YARN logs. In
1.5.4, we improved the logging [2]. Hence, it would be good if you can
reproduce this with debug level logging using Flink 1.5.4.

You could also try the vanilla Hadoop distribution, or Flink 1.6.

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-10104
[2] https://issues.apache.org/jira/browse/FLINK-10137

On Tue, Oct 9, 2018 at 5:55 PM Borys Gogulski 
wrote:

> Hey guys,
>
> thanks for the replies.
> 1. "Requesting new TaskExecutor" looks fine as it's exactly 32 as is jobs'
> parallelism set.
> The weird thing is that after those 32 containers requested and received we
> have this "flood" of 'Received new container/Returning excess container`
> (and as shown below it's actually doing something on YARN side)
> Where does those come from?
> 2. I felt that DEBUG will be needed, we'll see what we can do about it.
> 3. Yes, all in favor for upgrading to 1.5.4. But as Gary mentioned there
> seems to be no fixes that could heal it (I was reading release notes
> previous to posting this thread ; )).
> 4. Hadoop: 2.6.0+cdh5.14.0
>
> Here are logs for one of "excess" containers:
> 1. Flink JM
> 2018-10-09 17:35:33,493 INFO  org.apache.flink.yarn.YarnResourceManager
>
> - Received new container: container_e96_1538374332137_3071_01_2485560 -
> Remaining pending container requests: 0
> 2018-10-09 17:35:33,493 INFO  org.apache.flink.yarn.YarnResourceManager
>
> - Returning excess container container_e96_1538374332137_3071_01_2485560.
> 2. YARN
> 2018-10-09 17:35:33,283 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
> container_e96_1538374332137_3071_01_2485560 Container Transitioned from NEW
> to ALLOCATED
> 2018-10-09 17:35:33,283 INFO
> org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=user
>
> OPERATION=AM Allocated ContainerTARGET=SchedulerApp
> RESULT=SUCCESS  APPID=application_1538374332137_3071
> CONTAINERID=container_e96_1538374332137_3071_01_2485560
> 2018-10-09 17:35:33,283 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode:
> Assigned container container_e96_1538374332137_3071_01_2485560 of capacity
>  on host server:44142, which has 5 containers,
>  used and  available after
> allocation
> 2018-10-09 17:35:33,283 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
> assignedContainer application attempt=appattempt_1538374332137_3071_01
> container=Container: [ContainerId:
> container_e96_1538374332137_3071_01_2485560, NodeId: server:44142,
> NodeHttpAddress: server:8042, Resource: , Priority:
> 0, Token: null, ] queue=queue: capacity=0.5, absoluteCapacity=0.5,
> usedResources=, usedCapacity=1.9947916,
> absoluteUsedCapacity=0.9973958, numApps=2, numContainers=383
> clusterResource=
> 2018-10-09 17:35:33,485 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
> container_e96_1538374332137_3071_01_2485560 Container Transitioned from
> ALLOCATED to ACQUIRED
> 2018-10-09 17:35:38,532 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
> container_e96_1538374332137_3071_01_2485560 Container Transitioned from
> ACQUIRED to RELEASED
> 2018-10-09 17:35:38,532 INFO
>
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp:
> Completed container: container_e96_1538374332137_3071_01_2485560 in state:
> RELEASED event:RELEASED
> 2018-10-09 17:35:38,532 INFO
> org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=user
>
> IP=ip  OPERATION=AM Released Container TARGET=SchedulerApp
> RESULT=SUCCESS  APPID=application_1538374332137_3071
> CONTAINERID=container_e96_1538374332137_3071_01_2485560
> 2018-10-09 17:35:38,532 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode:
> Released container container_e96_1538374332137_3071_01_2485560 of capacity
>  on host server:44142, which currently has 0
> containers,  used and 
> available, release resources=true
> 2018-10-09 17:35:38,532 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
> completedContainer container=Container: [ContainerId:
> container_e96_1538374332137_3071_01_2485560, NodeId: server:44142,
> NodeHttpAddress: server:8042, Resource: , Priority:
> 0, Token: Token { kind: ContainerToken, service: ip:44142 }, ] queue=queue:
> capacity=0.5, absoluteCapacity=0.5, usedResources= vCores:96>, usedCapacity=0.5, absoluteUsedCapacity=0.25, numApps=2,
> numContainers=96 cluster=
> 2018-10-09 17:35:38,532 INFO
>
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler:
> 

Re: [deserialization schema] skip data, that couldn't be properly deserialized

2018-10-10 Thread Rinat
Hi Fabian, I have created the issue, 
https://issues.apache.org/jira/browse/FLINK-10525

Thx !

> On 10 Oct 2018, at 16:47, Fabian Hueske  wrote:
> 
> Hi Rinat,
> 
> Thanks for discussing this idea. Yes, I think this would be a good feature. 
> Can you open a Jira issue and describe the feature?
> 
> Thanks, Fabian
> 
> Am Do., 4. Okt. 2018 um 19:28 Uhr schrieb Rinat  >:
> Hi mates, in accordance with the contract of 
> org.apache.flink.formats.avro.DeserializationSchema, it should return null 
> value, when content couldn’t be deserialized.
> But in most cases (for example 
> org.apache.flink.formats.avro.AvroDeserializationSchema) method fails if data 
> is corrupted. 
> 
> We’ve implemented our own SerDe class, that returns null, if data doesn’t 
> satisfy avro schema, but it’s rather hard to maintain this functionality 
> during migration to the latest Flink version.
> What do you think, maybe it’ll be useful if we will support optional skip of 
> failed records in avro and other Deserializers in the source code ?
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Partitions vs. Subpartitions

2018-10-10 Thread Chris Miller
 

Hi, 

what's the difference between partitions and subpartitions? 

Thanks. 

CM 
 

Re: Identifying missing events in keyed streams

2018-10-10 Thread Fabian Hueske
Hi Averell,

I'd go with approach 2). As of Flink 1.6.0 you can delete timers.

But even if you are on a pre-1.6 version, a ProcessFunction would be the
way to go, IMO.
You don't need to register a timer for each event.
Instead, you can register the first timer with the first event and have a
state that is updated with the timestamp of the last seen event.
When the timer fires, you check the if you need to raise an alert and
register a new timer such that it fires 1 minute after the last seen event
(last-seen + 1 minute - (now - last-seen)).

Best, Fabian

Am Do., 4. Okt. 2018 um 16:15 Uhr schrieb Averell :

> Hi everyone,
>
> I have a keyed stream which is expecting events every fixed interval (let's
> say 1 minute). I want to raise alarms for any key which has received no
> events in n-periods. What should be the cheapest way (in term of
> performance
> ) to do this?
> I thought of some solutions, but don't know which one is the best:
> 1. Sliding window then count the number of events in each window <<< this
> seems quite expensive when n is big.
> 2. Register a timer for every single event, record the last event timestamp
> and check that timestamp when the timer expires. (This would be the best if
> there's an option to cancel/modify a timer, but it seems that feature is
> not
> available yet)
> 3. Session window: i haven't implemented this to verify its feasibility.
> Thinking of firing the alarm on every window clear event.
> 4. CEP. I don't know whether it's possible or not. Haven't found a guide
> for
> defining patterns of missing events.
>
> Could you please give some advices?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [deserialization schema] skip data, that couldn't be properly deserialized

2018-10-10 Thread Fabian Hueske
Hi Rinat,

Thanks for discussing this idea. Yes, I think this would be a good feature.
Can you open a Jira issue and describe the feature?

Thanks, Fabian

Am Do., 4. Okt. 2018 um 19:28 Uhr schrieb Rinat :

> Hi mates, in accordance with the contract of
> org.apache.flink.formats.avro.DeserializationSchema, it should return *null
> *value, when content couldn’t be deserialized.
> But in most cases (for example org.apache.flink.formats.avro.
> AvroDeserializationSchema) method fails if data is corrupted.
>
> We’ve implemented our own SerDe class, that returns null, if data doesn’t
> satisfy avro schema, but it’s rather hard to maintain this functionality
> during migration to the latest Flink version.
> What do you think, maybe it’ll be useful if we will support optional skip
> of failed records in avro and other Deserializers in the source code ?
>
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
>
> CleverDATA
> make your data clever
>
>


Re: Large rocksdb state restore/checkpoint duration behavior

2018-10-10 Thread Stefan Richter
Hi,

I would assume that the problem about blocked processing during a checkpoint is 
caused by [1], because you mentioned the use of RocksDB incremental checkpoints 
and it could be that you use it in combination with heap-based timers. This is 
the one combination that currently still uses a synchronous checkpointing path 
for the timers, and if you have many timers, this can block the pipeline.

For the cancellation problem, as seen in the stack trace, I would assume it is 
because of [2]. In a nutshell: if the wall clock or event time changes, 
multiple timers can trigger (it can be a lot, also depending on how big the 
change is) and currently this loop does not check the task’s cancellation 
status and will only terminate when all onTimer calls have been handled.

If you have problems with slow save points, you can also try to restore from 
the externalised handle of an incremental checkpoint and see if this works 
better.

Best,
Stefan

[1] https://issues.apache.org/jira/browse/FLINK-10026 

[2] https://issues.apache.org/jira/browse/FLINK-9845

> On 10. Oct 2018, at 12:39, Aminouvic  wrote:
> 
> Hi,
> 
> We are using Flink 1.6.1 on yarn with rocksdb as backend incrementally
> checkpointed to hdfs (for data and timers).
> 
> The job reads events from kafka (~1 billion event per day), constructs user
> sessions using an EventTimeSessionWindow coupled with a late firing trigger
> and WindowFunction with AggregatingState (few minutes gap, 1 day allowed
> lateness, ~1TB state ) to produces results back into kafka (~200 millions
> event per day).
> 
> When trying to restart the job for maintenance (stopped the cluster for 1
> hour), the restore duration took several hours.
> 
> Task metrics showed that no new data was read from Kafka, but the job
> produced data out.
> 
> Also, sometimes, the job seems to freeze (no data in/out) while performing
> long checkpoints (~20 minutes)
> 
> When we try to cancel the job it takes several minutes before stopping and
> logs show the following :
> :
> 2018-10-09 11:53:53,348 WARN  org.apache.flink.runtime.taskmanager.Task   
>  
> - Task 'window-function -> (Sink: kafka-sink, Sink: kafka-late-sink) (1/60)'
> did not react to cancelling signal for 30 seconds, but is stuck in method:
> org.rocksdb.RocksDB.get(Native Method)
> org.rocksdb.RocksDB.get(RocksDB.java:810)
> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:120)
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:452)
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> java.lang.Thread.run(Thread.java:745)
> 
> Any ideas on this ?
> 
> Regards,
> Amine
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Job manager logs for previous YARN attempts

2018-10-10 Thread Gary Yao
Hi Pawel,

As far as I know, the application attempt is incremented if the application
master fails and a new one is brought up. Therefore, what you are seeing
should not happen. I have just deployed on AWS EMR 5.17.0 (Hadoop 2.8.4) and
killed the container running the application master – the container id was
not
reused. Can you describe how to reproduce this behavior? Do you have a
sample
application? Can you observe this behavior consistently? Can you share the
complete output of

yarn logs -applicationId ?

The call to the method setKeepContainersAcrossApplicationAttempts is needed
to
enable recovery of previously allocated TaskManager containers [1]. I
currently do not see how it is possible to keep the AM container across
application attempts.

> The second challenge is understanding if the job will be restored into new
> application attempts or new application attempt will just have flink
running
> without any job?

The job will be restored if you have HA enabled [2][3].

Best,
Gary

[1]
https://hortonworks.com/blog/apache-hadoop-yarn-hdp-2-2-fault-tolerance-features-long-running-services/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/jobmanager_high_availability.html#yarn-cluster-high-availability
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/yarn_setup.html#recovery-behavior-of-flink-on-yarn

On Mon, Oct 8, 2018 at 12:32 PM Pawel Bartoszek 
wrote:

> Hi,
>
> I am looking into the cause YARN starts new application attempt on Flink
> 1.5.2. The challenge is getting the logs for the first attempt. After
> checking YARN I discovered that in the first attempt and the second one
> application manager (job manager) gets assigned the same container id (is
> this expected ?)  In this case logs from the first attempt are overwritten?
> I found that *setKeepContainersAcrossApplicationAttempts* is enabled here
> here
> 
>
> The second challenge is understanding if the job will be restored into new
> application attempts or new application attempt will just have flink
> running without any job?
>
>
> Regards,
> Pawel
>
> *First attempt:*
>
> pawel_bartoszek@ip-10-4-X-X ~]$ yarn container -list
> appattempt_1538570922803_0020_01
> 18/10/08 10:16:16 INFO client.RMProxy: Connecting to ResourceManager at
> ip-10-4-X-X.eu-west-1.compute.internal/10.4.108.26:8032
> Total number of containers :1
>   Container-Id   Start Time  Finish Time
>  State HostNode Http Address
> LOG-URL
> container_1538570922803_0020_02_01 Mon Oct 08 09:47:17 + 2018
>  N/A  RUNNING
> ip-10-4-X-X.eu-west-1.compute.internal:8041
> http://ip-10-4-X-X.eu-west-1.compute.internal:8042
> http://ip-10-4-X-X.eu-west-1.compute.internal:8042/node/containerlogs/container_1538570922803_0020_02_01/pawel_bartoszek
>
> *Second attempt:*
> [pawel_bartoszek@ip-10-4-X-X ~]$ yarn container -list
> appattempt_1538570922803_0020_02
> 18/10/08 10:16:37 INFO client.RMProxy: Connecting to ResourceManager at
> ip-10-4-X-X.eu-west-1.compute.internal/10.4.X.X:8032
> Total number of containers :1
>   Container-Id   Start Time  Finish Time
>  State HostNode Http Address
> LOG-URL
> container_1538570922803_0020_02_01 Mon Oct 08 09:47:17 + 2018
>  N/A  RUNNING
> ip-10-4-X-X.eu-west-1.compute.internal:8041
> http://ip-10-4-X-X.eu-west-1.compute.internal:8042
> http://ip-10-4-X-X.eu-west-1.compute.internal:8042/node/containerlogs/container_1538570922803_0020_02_01/pawel_bartoszek
>


Large rocksdb state restore/checkpoint duration behavior

2018-10-10 Thread Aminouvic
Hi,

We are using Flink 1.6.1 on yarn with rocksdb as backend incrementally
checkpointed to hdfs (for data and timers).

The job reads events from kafka (~1 billion event per day), constructs user
sessions using an EventTimeSessionWindow coupled with a late firing trigger
and WindowFunction with AggregatingState (few minutes gap, 1 day allowed
lateness, ~1TB state ) to produces results back into kafka (~200 millions
event per day).

When trying to restart the job for maintenance (stopped the cluster for 1
hour), the restore duration took several hours.

Task metrics showed that no new data was read from Kafka, but the job
produced data out.

Also, sometimes, the job seems to freeze (no data in/out) while performing
long checkpoints (~20 minutes)

When we try to cancel the job it takes several minutes before stopping and
logs show the following :
:
2018-10-09 11:53:53,348 WARN  org.apache.flink.runtime.taskmanager.Task 
   
- Task 'window-function -> (Sink: kafka-sink, Sink: kafka-late-sink) (1/60)'
did not react to cancelling signal for 30 seconds, but is stuck in method:
 org.rocksdb.RocksDB.get(Native Method)
org.rocksdb.RocksDB.get(RocksDB.java:810)
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:120)
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:452)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
java.lang.Thread.run(Thread.java:745)

Any ideas on this ?

Regards,
Amine




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: cannot resolve constructor when invoke FlinkKafkaProducer011 constructor in scala

2018-10-10 Thread 远远
IDEA only show "cannot resolve constructor" error message, without other
error message.

Dawid Wysakowicz  于2018年10月10日周三 下午5:55写道:

> Hi,
>
> what is the exact error message you are getting?
>
> Best,
>
> Dawid
>
> On 10/10/18 11:51, 远远 wrote:
>
> invoke FlinkKafkaProducer011 constructor in scala:
>
> val producer = new 
> FlinkKafkaProducer011[PVEvent.Entity](appConf.getPvEventTopic, new 
> PvEventSerializeSchema,producerProps, 
> Optional.of(FlinkRebalancePartitioner[PVEvent.Entity]))
>
> and the constructor is :
>
> /** * Creates a FlinkKafkaProducer for a given topic. The sink produces its 
> input to * the topic. It accepts a keyed {@link KeyedSerializationSchema} and 
> possibly a custom {@link FlinkKafkaPartitioner}. * * If a partitioner is 
> not provided, written records will be partitioned by the attached key of each 
> * record (as determined by {@link 
> KeyedSerializationSchema#serializeKey(Object)}). If written records do not * 
> have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} 
> returns {@code null}), they * will be distributed to Kafka partitions in a 
> round-robin fashion. * * @param defaultTopicId The default topic to write 
> data to * @param serializationSchema A serializable serialization schema for 
> turning user objects into a kafka-consumable byte[] supporting key/value 
> messages * @param producerConfig Configuration properties for the 
> KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param 
> customPartitioner A serializable partitioner for assigning messages to Kafka 
> partitions. *  If a partitioner is not provided, 
> records will be partitioned by the key of each record *   
>(determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If 
> the keys *  are {@code null}, then records will be 
> distributed to Kafka partitions in a *  round-robin 
> fashion. */public FlinkKafkaProducer011(
>   String defaultTopicId,  KeyedSerializationSchema 
> serializationSchema,  Properties producerConfig,  
> Optional> customPartitioner) {
>this(
>   defaultTopicId,  serializationSchema,  producerConfig,  
> customPartitioner,  Semantic.AT_LEAST_ONCE,  
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);}
>
> but cannot complie pass, and IDEA show ''cannot resolve constructor"
> error.
> and i invoke other constructor that without java8 Optional params, it
> will no
> error。  because of java8 Optional param?what should i do?
>
>
>


Re: cannot resolve constructor when invoke FlinkKafkaProducer011 constructor in scala

2018-10-10 Thread Dawid Wysakowicz
Hi,

what is the exact error message you are getting?

Best,

Dawid


On 10/10/18 11:51, 远远 wrote:
> invoke FlinkKafkaProducer011 constructor in scala:
> val producer = new 
> FlinkKafkaProducer011[PVEvent.Entity](appConf.getPvEventTopic, new 
> PvEventSerializeSchema, producerProps, 
> Optional.of(FlinkRebalancePartitioner[PVEvent.Entity]))
> and the constructor is :
>
> /** * Creates a FlinkKafkaProducer for a given topic. The sink
> produces its input to * the topic. It accepts a keyed {@link
> KeyedSerializationSchema} and possibly a custom {@link
> FlinkKafkaPartitioner}. * * If a partitioner is not provided,
> written records will be partitioned by the attached key of each *
> record (as determined by {@link
> KeyedSerializationSchema#serializeKey(Object)}). If written records do
> not * have a key (i.e., {@link
> KeyedSerializationSchema#serializeKey(Object)} returns {@code null}),
> they * will be distributed to Kafka partitions in a round-robin
> fashion. * * @param defaultTopicId The default topic to write data to
> * @param serializationSchema A serializable serialization schema for
> turning user objects into a kafka-consumable byte[] supporting
> key/value messages * @param producerConfig Configuration properties
> for the KafkaProducer. 'bootstrap.servers.' is the only required
> argument. * @param customPartitioner A serializable partitioner for
> assigning messages to Kafka partitions. * If a partitioner is not
> provided, records will be partitioned by the key of each record *
> (determined by {@link KeyedSerializationSchema#serializeKey(Object)}).
> If the keys * are {@code null}, then records will be distributed to
> Kafka partitions in a * round-robin fashion. */ public
> FlinkKafkaProducer011(
>   String defaultTopicId, KeyedSerializationSchema 
> serializationSchema, Properties producerConfig, 
> Optional> customPartitioner) {
>this(
>   defaultTopicId, serializationSchema, producerConfig, customPartitioner, 
> Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); }
> but cannot complie pass, and IDEA show ''cannot resolve constructor"
> error. 
> and i invoke other constructor that without java8 Optional params, it 
> will no
> error。  because of java8 Optional param?what should i do?
>



signature.asc
Description: OpenPGP digital signature


Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Averell
Hi Kostas,

No, the same code was used.
I (1) started the job, (2) created a savepoint, (3) cancelled the job, (4)
restored the job with the same command as in (1) with the addition "-s
".

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


cannot resolve constructor when invoke FlinkKafkaProducer011 constructor in scala

2018-10-10 Thread 远远
invoke FlinkKafkaProducer011 constructor in scala:

val producer = new
FlinkKafkaProducer011[PVEvent.Entity](appConf.getPvEventTopic, new
PvEventSerializeSchema,
producerProps, Optional.of(FlinkRebalancePartitioner[PVEvent.Entity]))

and the constructor is :

/**
 * Creates a FlinkKafkaProducer for a given topic. The sink produces
its input to
 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and
possibly a custom {@link FlinkKafkaPartitioner}.
 *
 * If a partitioner is not provided, written records will be
partitioned by the attached key of each
 * record (as determined by {@link
KeyedSerializationSchema#serializeKey(Object)}). If written records do
not
 * have a key (i.e., {@link
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}),
they
 * will be distributed to Kafka partitions in a round-robin fashion.
 *
 * @param defaultTopicId The default topic to write data to
 * @param serializationSchema A serializable serialization schema for
turning user objects into a kafka-consumable byte[] supporting
key/value messages
 * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
 * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
 *  If a partitioner is not provided, records
will be partitioned by the key of each record
 *  (determined by {@link
KeyedSerializationSchema#serializeKey(Object)}). If the keys
 *  are {@code null}, then records will be
distributed to Kafka partitions in a
 *  round-robin fashion.
 */
public FlinkKafkaProducer011(
  String defaultTopicId,
  KeyedSerializationSchema serializationSchema,
  Properties producerConfig,
  Optional> customPartitioner) {
   this(
  defaultTopicId,
  serializationSchema,
  producerConfig,
  customPartitioner,
  Semantic.AT_LEAST_ONCE,
  DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}

but cannot complie pass, and IDEA show ''cannot resolve constructor" error.
and i invoke other constructor that without java8 Optional params, it  will
no
error。  because of java8 Optional param?what should i do?


Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Kostas Kloudas
You restore your job with the custom source from a savepoint taken without the 
custom source?


> On Oct 10, 2018, at 11:34 AM, Averell  wrote:
> 
> Hi Kostas,
> 
> Yes, I modified ContinuousFileMonitoringFunction to add one more
> ListState. The error might/should have come from that, but I haven't
> been able to find out why.
> 
> All of my keyed streams are defined by Scala tuples like: /ikeyBy(r =>
> (r.customer_id, r.address))/, and the fields using as keys are of types
> either String or Long. For this, I don't have to define equals and hashcode
> method, do I?
> 
> Thanks and best regards,
> Averell
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Averell
Hi Kostas,

Yes, I modified ContinuousFileMonitoringFunction to add one more
ListState. The error might/should have come from that, but I haven't
been able to find out why.

All of my keyed streams are defined by Scala tuples like: /ikeyBy(r =>
(r.customer_id, r.address))/, and the fields using as keys are of types
either String or Long. For this, I don't have to define equals and hashcode
method, do I?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-10 Thread Fabian Hueske
Hi Xuefu,

Welcome to the Flink community and thanks for starting this discussion!
Better Hive integration would be really great!
Can you go into details of what you are proposing? I can think of a couple
ways to improve Flink in that regard:

* Support for Hive UDFs
* Support for Hive metadata catalog
* Support for HiveQL syntax
* ???

Best, Fabian

Am Di., 9. Okt. 2018 um 19:22 Uhr schrieb Zhang, Xuefu <
xuef...@alibaba-inc.com>:

> Hi all,
>
> Along with the community's effort, inside Alibaba we have explored Flink's
> potential as an execution engine not just for stream processing but also
> for batch processing. We are encouraged by our findings and have initiated
> our effort to make Flink's SQL capabilities full-fledged. When comparing
> what's available in Flink to the offerings from competitive data processing
> engines, we identified a major gap in Flink: a well integration with Hive
> ecosystem. This is crucial to the success of Flink SQL and batch due to the
> well-established data ecosystem around Hive. Therefore, we have done some
> initial work along this direction but there are still a lot of effort
> needed.
>
> We have two strategies in mind. The first one is to make Flink SQL
> full-fledged and well-integrated with Hive ecosystem. This is a similar
> approach to what Spark SQL adopted. The second strategy is to make Hive
> itself work with Flink, similar to the proposal in [1]. Each approach bears
> its pros and cons, but they don’t need to be mutually exclusive with each
> targeting at different users and use cases. We believe that both will
> promote a much greater adoption of Flink beyond stream processing.
>
> We have been focused on the first approach and would like to showcase
> Flink's batch and SQL capabilities with Flink SQL. However, we have also
> planned to start strategy #2 as the follow-up effort.
>
> I'm completely new to Flink(, with a short bio [2] below), though many of
> my colleagues here at Alibaba are long-time contributors. Nevertheless, I'd
> like to share our thoughts and invite your early feedback. At the same
> time, I am working on a detailed proposal on Flink SQL's integration with
> Hive ecosystem, which will be also shared when ready.
>
> While the ideas are simple, each approach will demand significant effort,
> more than what we can afford. Thus, the input and contributions from the
> communities are greatly welcome and appreciated.
>
> Regards,
>
>
> Xuefu
>
> References:
>
> [1] https://issues.apache.org/jira/browse/HIVE-10712
> [2] Xuefu Zhang is a long-time open source veteran, worked or working on
> many projects under Apache Foundation, of which he is also an honored
> member. About 10 years ago he worked in the Hadoop team at Yahoo where the
> projects just got started. Later he worked at Cloudera, initiating and
> leading the development of Hive on Spark project in the communities and
> across many organizations. Prior to joining Alibaba, he worked at Uber
> where he promoted Hive on Spark to all Uber's SQL on Hadoop workload and
> significantly improved Uber's cluster efficiency.
>
>
>


Re: getRuntimeContext(): The runtime context has not been initialized.

2018-10-10 Thread Fabian Hueske
Yes, it would be good to post your code.
Are you using a FoldFunction in a window (if yes, what window) or as a
running aggregate?

In general, collecting state in a FoldFunction is usually not something
that you should do. Did you consider using an AggregateFunction?

Fabian

Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler <
ches...@apache.org>:

> In which method are you calling getRuntimeContext()? This method can only
> be used after open() has been called.
>
> On 09.10.2018 17:09, Ahmad Hassan wrote:
>
> Hi,
>
> We want to use MapState inside fold function to keep the map of all
> products that we see in 24 hour window to store huge state in rocksdb
> rather than overflowing heap. However, I don't seem to initialise mapstate
> within foldfunction or any class that is extending RichMapFunction
>
> private transient MapStateDescriptor descr = new
> MapStateDescriptor<>("mymap", String.class, String.class);
> this.getRuntimeContext().getMapState(descr);
>
> I get error
>
> java.lang.IllegalStateException: The runtime context has not been
> initialized.
> at
> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>
>
> Any clues how to get the runtime context please?
>
> Thanks.
>
> Best regards
>
>
>


Re: getRuntimeContext(): The runtime context has not been initialized.

2018-10-10 Thread Chesnay Schepler
In which method are you calling getRuntimeContext()? This method can 
only be used after open() has been called.


On 09.10.2018 17:09, Ahmad Hassan wrote:

Hi,

We want to use MapState inside fold function to keep the map of all 
products that we see in 24 hour window to store huge state in rocksdb 
rather than overflowing heap. However, I don't seem to initialise 
mapstate within foldfunction or any class that is extending 
RichMapFunction


private transient MapStateDescriptor descr = new 
MapStateDescriptor<>("mymap", String.class, String.class);

this.getRuntimeContext().getMapState(descr);

I get error

java.lang.IllegalStateException: The runtime context has not been 
initialized.
at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)



Any clues how to get the runtime context please?

Thanks.

Best regards





Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Kostas Kloudas
Hi Averell,

In the logs there are some “Split Reader: Custom File Source:” 
This is a custom source you implemented?
Also is your keySelector deterministic with proper equals and hashcode methods? 

Cheers,
Kostas

> On Oct 10, 2018, at 10:50 AM, Averell  wrote:
> 
> Hi Stefan, Dawid,
> 
> I hadn't changed anything in the configuration. Env's parallelism stayed at
> 64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink
> 1.7-SNAPSHOT, with the code pulled from the master branch about 5 days back.
> Savepoint was saved to either S3 or HDFS (I tried multiple times), and had
> not been moved.
> 
> Is there any kind of improper user code can cause such error?
> 
> Thanks for your support.
> 
> Best regards,
> Averell
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Wondering how to check if taskmanager is registered with the jobmanager, without asking job manager

2018-10-10 Thread Piotr Nowojski
You’re welcome :)

> On 10 Oct 2018, at 10:28, Bariša  wrote:
> 
> Thnx Piotr. I agree, that would work. It's a bit chicken and the egg problem, 
> since at that point we can't just spin up a task manager, and have it 
> register itself, we need to have flinkmanager know how many task managers 
> should be there. Bit more logic, but doable. Thnx for the tip.
> 
> Cheers,
> Barisa
> 
> On Wed, 10 Oct 2018 at 09:05, Piotr Nowojski  > wrote:
> Hi,
> 
> I don’t think that’s exposed on the TaskManager.
> 
> Maybe it would simplify things a bit if you implement this as a single 
> “JobManager” health check, not multiple TaskManagers health check - for 
> example verify that there are expected number of registered TaskManagers. It 
> might cover your case.
> 
> Piotrek
> 
>> On 9 Oct 2018, at 12:21, Bariša > > wrote:
>> 
>> As part of deploying task managers and job managers, I'd like to expose 
>> healthcheck on both task managers and job managers.
>> 
>> For the task managers, one of the requirements that they are healthy, is 
>> that they have successfully registered themselves with the job manager.
>> 
>> Is there a way to achieve this, without making a call to job manager ( to do 
>> that, I first need to make a call to the zookeeper to find the job manager, 
>> so I'm trying to simplify the health check ).
>> 
>> Ideally, taskmanager would have a metric that says, ( am registered ), but 
>> afaik, that doesn't exist 
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#cluster
>>  
>> 
>> 
>> 
>> P.S.
>> This is my first post in the email list, happy to update/change my question, 
>> if I messed up, or misunderstood something.
>> 
>> Cheers,
>> Barisa
> 



Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Averell
Hi Stefan, Dawid,

I hadn't changed anything in the configuration. Env's parallelism stayed at
64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink
1.7-SNAPSHOT, with the code pulled from the master branch about 5 days back.
Savepoint was saved to either S3 or HDFS (I tried multiple times), and had
not been moved.

Is there any kind of improper user code can cause such error?

Thanks for your support.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Le-Van Huyen
Hi Stefan, Dawid,

I hadn't changed anything in the configuration. Env's parallelism stayed at
64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink
1.7-SNAPSHOT, with the code pulled from master about 5 days back. Savepoint
was saved to either S3 or HDFS (I tried multiple times), and had not been
moved.

Is there any kind of improper user code can cause such error?

Thanks and best regards,
Averell

On Wed, Oct 10, 2018 at 7:02 PM Stefan Richter 
wrote:

> Hi,
>
> adding to Dawids questions, it would also be very helpful to know which
> Flink version was used to create the savepoint, which Flink version was
> used in the restore attempt, if the savepoint was moved or modified.
> Outside of potential conflicts with those things, I would not expect
> anything like this.
>
> Best,
> Stefan
>
> > On 10. Oct 2018, at 09:51, Dawid Wysakowicz 
> wrote:
> >
> > Hi Averell,
> >
> > Do you try to scale the job up, meaning do you increase the job
> > parallelism? Have you increased the job max parallelism by chance? If so
> > this is not supported. The max parallelism parameter is used to create
> > key groups that can be further assigned to parallel operators. This
> > parameter cannot be changed for a job that shall be restored.
> >
> > If this is not the case, maybe Stefan(cc) have some ideas, what can go
> > wrong.
> >
> > Best,
> >
> > Dawid
> >
> >
> > On 10/10/18 09:23, Averell wrote:
> >> Hi everyone,
> >>
> >> I'm getting the following error when trying to restore from a savepoint.
> >> Here below is the output from flink bin, and in the attachment is a TM
> log.
> >> I didn't have any change in the app before and after savepoint. All
> Window
> >> operators have been assigned unique ID string.
> >>
> >> Could you please help give a look?
> >>
> >> Thanks and best regards,
> >> Averell
> >>
> >> taskmanager.gz
> >> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/taskmanager.gz>
>
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> >> (JobID: 606ad5239f5e23cedb85d3e75bf76463)
> >>  at
> >>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> >>  at
> >>
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
> >>  at
> >>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> >>  at
> >>
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
> >>  at
> >>
> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
> >>  at
> >>
> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
> >>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>  at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>  at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>  at java.lang.reflect.Method.invoke(Method.java:498)
> >>  at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> >>  at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> >>  at
> >>
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> >>  at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> >>  at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> >>  at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >>  at
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> >>  at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> >>  at java.security.AccessController.doPrivileged(Native Method)
> >>  at javax.security.auth.Subject.doAs(Subject.java:422)
> >>  at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> >>  at
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>  at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> >> execution failed.
> >>  at
> >>
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> >>  at
> >>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> >>  ... 22 more
> >> Caused by: java.lang.Exception: Exception while creating
> >> StreamOperatorStateContext.
> >>  at
> >>
> 

Re: Wondering how to check if taskmanager is registered with the jobmanager, without asking job manager

2018-10-10 Thread Bariša
Thnx Piotr. I agree, that would work. It's a bit chicken and the egg
problem, since at that point we can't just spin up a task manager, and have
it register itself, we need to have flinkmanager know how many task
managers should be there. Bit more logic, but doable. Thnx for the tip.

Cheers,
Barisa

On Wed, 10 Oct 2018 at 09:05, Piotr Nowojski 
wrote:

> Hi,
>
> I don’t think that’s exposed on the TaskManager.
>
> Maybe it would simplify things a bit if you implement this as a single
> “JobManager” health check, not multiple TaskManagers health check - for
> example verify that there are expected number of registered TaskManagers.
> It might cover your case.
>
> Piotrek
>
> On 9 Oct 2018, at 12:21, Bariša  wrote:
>
> As part of deploying task managers and job managers, I'd like to expose
> healthcheck on both task managers and job managers.
>
> For the task managers, one of the requirements that they are healthy, is
> that they have successfully registered themselves with the job manager.
>
> Is there a way to achieve this, without making a call to job manager ( to
> do that, I first need to make a call to the zookeeper to find the job
> manager, so I'm trying to simplify the health check ).
>
> Ideally, taskmanager would have a metric that says, ( am registered ), but
> afaik, that doesn't exist
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#cluster
>
>
> P.S.
> This is my first post in the email list, happy to update/change my
> question, if I messed up, or misunderstood something.
>
> Cheers,
> Barisa
>
>
>


Re: JobManager did not respond within 60000 ms

2018-10-10 Thread Piotr Nowojski
Hi again,

Glad that you solved your problem :)

Splitting code into smaller functions has its advantages, but more 
operators/tasks means more overhead for JobManager/TaskManager to manage them. 
Usually that’s not a big issue, but as I said, you were running your cluster on 
extremely low memory settings.

Piotrek

> On 9 Oct 2018, at 18:09, jpreis...@free.fr wrote:
> 
> Hi Piotrek,
> 
> Thank you for your answer. Actually it was necessary to increase the memory 
> of the JobManager (I had tested it but I had not restarted Flink ...).
> 
> I will also work on optimization. I thought it was good practice to create as 
> much function as possible based on their functional value (for example: 
> create two FilterFunctions that have a different functional meaning). So I 
> will try to have fewer functions (for example: gather my two FilterFunctions 
> in one).
> 
> Thanks again Piotrek !
> 
> Julien.
> 
> - Mail original -
> De: "Piotr Nowojski" 
> À: jpreis...@free.fr
> Cc: user@flink.apache.org
> Envoyé: Mardi 9 Octobre 2018 10:37:58
> Objet: Re: JobManager did not respond within 6 ms
> 
> Hi, 
> 
> 
> You have quite complicated job graph and very low memory settings for the job 
> manager and task manager. It might be that long GC pauses are causing this 
> problem. 
> 
> 
> Secondly, there are quite some results in google search of this error that 
> points toward high-availability issues. Have you read those previously 
> reported problems? 
> 
> 
> Thanks, Piotrek 
> 
> 
> 
> 
> 
> On 9 Oct 2018, at 09:57, jpreis...@free.fr wrote: 
> 
> 
> I have a streaming job that works in standalone cluster. Flink version is 
> 1.4.1. Everything was working so far. But since I added new treatments, I can 
> not start my job anymore. I have this exception : 
> 
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: JobManager did not respond within 6 ms 
> at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
>  
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
>  
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) 
> at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>  
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402) 
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) 
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) 
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) 
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) 
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) 
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>  
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) 
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager 
> did not respond within 6 ms 
> at 
> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
>  
> at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
>  
> ... 11 more 
> Caused by: java.util.concurrent.TimeoutException 
> at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) 
> at 
> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
>  
> ... 12 more 
> 
> I see a very strange behavior. When I comment on a function (any one, for 
> example a FilterFunction, which was present before or after my modification). 
> I tried to change the configuration (akka.client.timeout and akka.framesize) 
> without success. 
> 
> This is my flink-conf.yaml 
> jobmanager.rpc.address: myhost 
> jobmanager.rpc.port: 6123 
> jobmanager.heap.mb: 128 
> taskmanager.heap.mb: 1024 
> taskmanager.numberOfTaskSlots: 100 
> taskmanager.memory.preallocate: false 
> taskmanager.data.port: 6121 
> parallelism.default: 1 
> taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr 
> blob.storage.directory: /dohdev/flink/tmp/blob 
> jobmanager.web.port: -1 
> high-availability: zookeeper 
> high-availability.zookeeper.quorum: localhost:2181 
> high-availability.zookeeper.path.root: /dohdev/flink 
> high-availability.cluster-id: dev 
> high-availability.storageDir: file:mnt/metaflink 
> high-availability.zookeeper.storageDir: 
> /mnt/metaflink/inh/agregateur/recovery 
> restart-strategy: fixed-delay 
> restart-strategy.fixed-delay.attempts: 1000 
> restart-strategy.fixed-delay.delay: 5 s 
> zookeeper.sasl.disable: true 
> blob.service.cleanup.interval: 60 
> 
> And I launch a job with this command : bin/flink run -d myjar.jar 
> 
> I added as an attachment a graph of my job when it works (Graph.PNG). 
> 
> Do you have an idea of the problem ? 
> 
> Thanks. 
> Julien 
> 

Re: Flink leaves a lot RocksDB sst files in tmp directory

2018-10-10 Thread Piotr Nowojski
Hi,

Was this happening in older Flink version? Could you post in what circumstances 
the job has been moved to a new TM (full job manager logs and task manager logs 
would be helpful)? I’m suspecting that those leftover files might have 
something to do with local recovery.

Piotrek 

> On 9 Oct 2018, at 15:28, Sayat Satybaldiyev  wrote:
> 
> After digging more in the log, I think it's more a bug. I've greped a log by 
> job id and found under normal circumstances TM supposed to delete flink-io 
> files. For some reason, it doesn't delete files that were listed above.
> 
> 2018-10-08 22:10:25,865 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Deleting 
> existing instance base directory 
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_bf69685b-78d3-431c-88be-b3f26db05566.
> 2018-10-08 22:10:25,867 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Deleting 
> existing instance base directory 
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_14630a50145935222dbee3f1bcfdc2a6__1_1__uuid_47cd6e95-144a-4c52-a905-52966a5e9381.
> 2018-10-08 22:10:25,874 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Deleting 
> existing instance base directory 
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_7c539a96-a247-4299-b1a0-01df713c3c34.
> 2018-10-08 22:17:38,680 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close 
> JobManager connection for job a5b223c7aee89845f9aed24012e46b7e.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> 2018-10-08 22:17:38,686 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Deleting 
> existing instance base directory 
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_2e88c56a-2fc2-41f2-a1b9-3b0594f660fb.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> 2018-10-08 22:17:38,691 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Deleting 
> existing instance base directory 
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_b44aecb7-ba16-4aa4-b709-31dae7f58de9.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for 
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> 
> 
> On Tue, Oct 9, 2018 at 2:33 PM Sayat Satybaldiyev  > wrote:
> Dear all,
> 
> While running Flink 1.6.1 with RocksDB as a backend and hdfs as checkpoint 
> FS, I've noticed that after a job has moved to a different host it leaves 
> quite a huge state in temp folder(1.2TB in total). The files are not used as 
> TM is not running a job on the current host. 
> 
> The job a5b223c7aee89845f9aed24012e46b7e had been running on the host but 
> then it was moved to a different TM. I'm wondering is it intended behavior or 
> a possible bug?
> 
> I've attached files that are left and not used by a job in PrintScreen.



Re: Wondering how to check if taskmanager is registered with the jobmanager, without asking job manager

2018-10-10 Thread Piotr Nowojski
Hi,

I don’t think that’s exposed on the TaskManager.

Maybe it would simplify things a bit if you implement this as a single 
“JobManager” health check, not multiple TaskManagers health check - for example 
verify that there are expected number of registered TaskManagers. It might 
cover your case.

Piotrek

> On 9 Oct 2018, at 12:21, Bariša  wrote:
> 
> As part of deploying task managers and job managers, I'd like to expose 
> healthcheck on both task managers and job managers.
> 
> For the task managers, one of the requirements that they are healthy, is that 
> they have successfully registered themselves with the job manager.
> 
> Is there a way to achieve this, without making a call to job manager ( to do 
> that, I first need to make a call to the zookeeper to find the job manager, 
> so I'm trying to simplify the health check ).
> 
> Ideally, taskmanager would have a metric that says, ( am registered ), but 
> afaik, that doesn't exist 
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#cluster
>  
> 
> 
> 
> P.S.
> This is my first post in the email list, happy to update/change my question, 
> if I messed up, or misunderstood something.
> 
> Cheers,
> Barisa



Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Stefan Richter
Hi,

adding to Dawids questions, it would also be very helpful to know which Flink 
version was used to create the savepoint, which Flink version was used in the 
restore attempt, if the savepoint was moved or modified. Outside of potential 
conflicts with those things, I would not expect anything like this.

Best,
Stefan

> On 10. Oct 2018, at 09:51, Dawid Wysakowicz  wrote:
> 
> Hi Averell,
> 
> Do you try to scale the job up, meaning do you increase the job
> parallelism? Have you increased the job max parallelism by chance? If so
> this is not supported. The max parallelism parameter is used to create
> key groups that can be further assigned to parallel operators. This
> parameter cannot be changed for a job that shall be restored.
> 
> If this is not the case, maybe Stefan(cc) have some ideas, what can go
> wrong.
> 
> Best,
> 
> Dawid
> 
> 
> On 10/10/18 09:23, Averell wrote:
>> Hi everyone,
>> 
>> I'm getting the following error when trying to restore from a savepoint.
>> Here below is the output from flink bin, and in the attachment is a TM log.
>> I didn't have any change in the app before and after savepoint. All Window
>> operators have been assigned unique ID string.
>> 
>> Could you please help give a look?
>> 
>> Thanks and best regards,
>> Averell
>> 
>> taskmanager.gz
>> 
>>   
>> 
>> org.apache.flink.client.program.ProgramInvocationException: Job failed.
>> (JobID: 606ad5239f5e23cedb85d3e75bf76463)
>>  at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>>  at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>>  at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>  at
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
>>  at
>> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
>>  at
>> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>  at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>  at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>>  at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>  at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>  at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>  at java.security.AccessController.doPrivileged(Native Method)
>>  at javax.security.auth.Subject.doAs(Subject.java:422)
>>  at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>>  at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>  at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>  at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>>  ... 22 more
>> Caused by: java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>>  at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>>  at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64)
>> from any of the 1 provided restore options.
>>  at
>> 

Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Dawid Wysakowicz
Hi Averell,

Do you try to scale the job up, meaning do you increase the job
parallelism? Have you increased the job max parallelism by chance? If so
this is not supported. The max parallelism parameter is used to create
key groups that can be further assigned to parallel operators. This
parameter cannot be changed for a job that shall be restored.

If this is not the case, maybe Stefan(cc) have some ideas, what can go
wrong.

Best,

Dawid


On 10/10/18 09:23, Averell wrote:
> Hi everyone,
>
> I'm getting the following error when trying to restore from a savepoint.
> Here below is the output from flink bin, and in the attachment is a TM log.
> I didn't have any change in the app before and after savepoint. All Window
> operators have been assigned unique ID string.
>
> Could you please help give a look?
>
> Thanks and best regards,
> Averell
>
> taskmanager.gz
> 
>   
>
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 606ad5239f5e23cedb85d3e75bf76463)
>   at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>   at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
>   at
> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
>   at
> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>   at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>   at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 22 more
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64)
> from any of the 1 provided restore options.
>   at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
>   ... 5 more
> Caused by: java.lang.IllegalStateException: Unexpected key-group in 

Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Averell
Hi everyone,

I'm getting the following error when trying to restore from a savepoint.
Here below is the output from flink bin, and in the attachment is a TM log.
I didn't have any change in the app before and after savepoint. All Window
operators have been assigned unique ID string.

Could you please help give a look?

Thanks and best regards,
Averell

taskmanager.gz

  

org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 606ad5239f5e23cedb85d3e75bf76463)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
at
com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
at
com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 22 more
Caused by: java.lang.Exception: Exception while creating
StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64)
from any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
... 5 more
Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
at

答复: No data issued by flink window after a few hours

2018-10-10 Thread 潘 功森
Hi,

Cause default state size in one hour is too small,and the max window size is 24 
hours, so I used 500M.


MemoryStateBackend stateBackend = new MemoryStateBackend(MAX_STATE_SIZE);//500M
env.setStateBackend(stateBackend);

And I found Irrespective of the configured maximal state size, the state cannot 
be larger than the akka frame size.
So I add a config in flink-comf.yaml:
akka.framesize: 524288000b

What else do I have to pay attention to?

Yours,
September


发件人: vino yang 
发送时间: Wednesday, October 10, 2018 11:45:31 AM
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

I saw the exception image you provided. Based on the exception message, it 
seems you used the default max state size (5MB).

You can specify the max state size to override the default value. Try :

MemoryStateBackend stateBackend = new MemoryStateBackend(theSizeOfBytes);

Please note that you need to reserve enough memory for Flink.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午11:36写道:
Please have a look about my last mail.

When the cached window data is too large, how?

Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 11:33:48 AM
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

Did you mean "computer momery" referring to Memory Statebackend?
The Flink window mechanism is internally based on State, and this is done for 
fault tolerance.
If you introduce external storage, it will break its design and bring other 
problems.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午11:02写道:
Hi,
"ram to cache the distinct data about sliding window" means I used computer 
momery not the third part db to cache the data need used in window.
“the data need used in window” means :such as the sliding window is 1 hour, and 
I need to count the distinct users, I need to cache the user id about one hour.
Cause there’re no related errors.
Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 10:49:43 AM
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

Can you explain what "ram to cache the distinct data about sliding window" mean?
The information you provide is too small and will not help others to help you 
analyze the problem and provide advice.

In addition, regarding the usage of Flink related issues, please only send mail 
to the user mailing list.
The dev mailing list is mainly used to discuss development related issues.

Thanks vino.

? ?? mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午10:37写道:
Hi all,
I used flink window, and when the job begins, we could get the results of 
windiow.But there’re no results issued after a few hours.
I found the job is still running and no errors, and the data not used 
window all can be issued.
By the way, I used Flink 1.3.2 and ram to cache the distinct data about 
sliding window.

Yours,
September