[jira] [Commented] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL

2018-08-27 Thread Juho Autio (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593557#comment-16593557
 ] 

Juho Autio commented on FLINK-10216:


[~yanghua] thanks, I had missed it. I wonder if REGEXP_MATCH (or 
REGEXP_MATCHES?) should still be included for completeness. I think it's common 
enough for users to be looking for it quite often.

> Add REGEXP_MATCH in TableAPI and SQL
> 
>
> Key: FLINK-10216
> URL: https://issues.apache.org/jira/browse/FLINK-10216
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Juho Autio
>Priority: Major
>
> Here's a naive implementation:
> {code:java}
> public class RegexpMatchFunction extends ScalarFunction {
> // NOTE! Flink calls eval() by reflection
> public boolean eval(String value, String pattern) {
> return value != null && pattern != null && value.matches(pattern);
> }
> }
> {code}
> I wonder if there would be a way to optimize this to use 
> {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls 
> (possibly different values, but same pattern).
> h3. Naming
> Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: 
> [https://github.com/apache/flink/pull/6448#issuecomment-415972833]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL

2018-08-25 Thread Juho Autio (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juho Autio updated FLINK-10216:
---
Description: 
Here's a naive implementation:
{code:java}
public class RegexpMatchFunction extends ScalarFunction {
// NOTE! Flink calls eval() by reflection
public boolean eval(String value, String pattern) {
return value != null && pattern != null && value.matches(pattern);
}
}
{code}
I wonder if there would be a way to optimize this to use 
{{Pattern.compile(value)}} and use the compiled Pattern for multiple calls 
(possibly different values, but same pattern).
h3. Naming

Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: 
[https://github.com/apache/flink/pull/6448#issuecomment-415972833]

  was:
Here's a naive implementation:

{code:java}
public class RegexpMatchFunction extends ScalarFunction {
// NOTE! Flink calls eval() by reflection
public boolean eval(String value, String pattern) {
return value != null && pattern != null && value.matches(pattern);
}
}
{code}

I wonder if there would be a way to optimize this to use 
{{Pattern.compile(value)}} and use the compiled Pattern for multiple calls 
(possibly different values, but same pattern).


> Add REGEXP_MATCH in TableAPI and SQL
> 
>
> Key: FLINK-10216
> URL: https://issues.apache.org/jira/browse/FLINK-10216
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Juho Autio
>Priority: Major
>
> Here's a naive implementation:
> {code:java}
> public class RegexpMatchFunction extends ScalarFunction {
> // NOTE! Flink calls eval() by reflection
> public boolean eval(String value, String pattern) {
> return value != null && pattern != null && value.matches(pattern);
> }
> }
> {code}
> I wonder if there would be a way to optimize this to use 
> {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls 
> (possibly different values, but same pattern).
> h3. Naming
> Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: 
> [https://github.com/apache/flink/pull/6448#issuecomment-415972833]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL

2018-08-25 Thread Juho Autio (JIRA)
Juho Autio created FLINK-10216:
--

 Summary: Add REGEXP_MATCH in TableAPI and SQL
 Key: FLINK-10216
 URL: https://issues.apache.org/jira/browse/FLINK-10216
 Project: Flink
  Issue Type: Sub-task
Reporter: Juho Autio


Here's a naive implementation:

{code:java}
public class RegexpMatchFunction extends ScalarFunction {
// NOTE! Flink calls eval() by reflection
public boolean eval(String value, String pattern) {
return value != null && pattern != null && value.matches(pattern);
}
}
{code}

I wonder if there would be a way to optimize this to use 
{{Pattern.compile(value)}} and use the compiled Pattern for multiple calls 
(possibly different values, but same pattern).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9335) Expose client logs via Flink UI

2018-05-11 Thread Juho Autio (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471734#comment-16471734
 ] 

Juho Autio commented on FLINK-9335:
---

I'm submitting jobs on command line with flink run, yarn-cluster mode.

> Expose client logs via Flink UI
> ---
>
> Key: FLINK-9335
> URL: https://issues.apache.org/jira/browse/FLINK-9335
> Project: Flink
>  Issue Type: New Feature
>Reporter: Juho Autio
>Priority: Minor
>
> The logs logged by my Flink job jar _before_ *env.execute* can't be found in 
> jobmanager log in Flink UI.
> In my case they seem to be going to 
> /+home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log,+ 
> for example.
> [~fhueske] said:
> {quote}It seems like good feature request to include the client logs.
> {quote}
> Implementing this may not be as trivial as just reading another log file 
> though. As [~fhueske] commented:
> {quote}I assume that these logs are generated from a different process, i.e., 
> the client process and not the JM or TM process.
>  Hence, they end up in a different log file and are not covered by the log 
> collection of the UI.
>  The reason is that *this process might also be run on a machine outside of 
> the cluster. So the client log file might not be accessible from the UI 
> process*. 
> {quote}
> This was discussed on the user mailing list:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Application-logs-missing-from-jobmanager-log-tp19830p19882.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9335) Expose client logs via Flink UI

2018-05-11 Thread Juho Autio (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juho Autio updated FLINK-9335:
--
Description: 
The logs logged by my Flink job jar _before_ *env.execute* can't be found in 
jobmanager log in Flink UI.

In my case they seem to be going to 
/+home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log,+ 
for example.

[~fhueske] said:
{quote}It seems like good feature request to include the client logs.
{quote}
Implementing this may not be as trivial as just reading another log file 
though. As [~fhueske] commented:
{quote}I assume that these logs are generated from a different process, i.e., 
the client process and not the JM or TM process.
 Hence, they end up in a different log file and are not covered by the log 
collection of the UI.
 The reason is that *this process might also be run on a machine outside of the 
cluster. So the client log file might not be accessible from the UI process*. 
{quote}
This was discussed on the user mailing list:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Application-logs-missing-from-jobmanager-log-tp19830p19882.html

  was:
The logs logged by my Flink job jar _before_ *env.execute* can't be found in 
jobmanager log in Flink UI.

In my case they seem to be going to 
/+home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log,+ 
for example.

[~fhueske] said:
{quote}It seems like good feature request to include the client logs.{quote}
Implementing this may not be as trivial as just reading another log file 
though. As [~fhueske] commented:
{quote}I assume that these logs are generated from a different process, i.e., 
the client process and not the JM or TM process.
Hence, they end up in a different log file and are not covered by the log 
collection of the UI.
The reason is that *this process might also be run on a machine outside of the 
cluster. So the client log file might not be accessible from the UI process*. 
{quote}


> Expose client logs via Flink UI
> ---
>
> Key: FLINK-9335
> URL: https://issues.apache.org/jira/browse/FLINK-9335
> Project: Flink
>  Issue Type: Improvement
>Reporter: Juho Autio
>Priority: Major
>
> The logs logged by my Flink job jar _before_ *env.execute* can't be found in 
> jobmanager log in Flink UI.
> In my case they seem to be going to 
> /+home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log,+ 
> for example.
> [~fhueske] said:
> {quote}It seems like good feature request to include the client logs.
> {quote}
> Implementing this may not be as trivial as just reading another log file 
> though. As [~fhueske] commented:
> {quote}I assume that these logs are generated from a different process, i.e., 
> the client process and not the JM or TM process.
>  Hence, they end up in a different log file and are not covered by the log 
> collection of the UI.
>  The reason is that *this process might also be run on a machine outside of 
> the cluster. So the client log file might not be accessible from the UI 
> process*. 
> {quote}
> This was discussed on the user mailing list:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Application-logs-missing-from-jobmanager-log-tp19830p19882.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9335) Expose client logs via Flink UI

2018-05-11 Thread Juho Autio (JIRA)
Juho Autio created FLINK-9335:
-

 Summary: Expose client logs via Flink UI
 Key: FLINK-9335
 URL: https://issues.apache.org/jira/browse/FLINK-9335
 Project: Flink
  Issue Type: Improvement
Reporter: Juho Autio


The logs logged by my Flink job jar _before_ *env.execute* can't be found in 
jobmanager log in Flink UI.

In my case they seem to be going to 
/+home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log,+ 
for example.

[~fhueske] said:
{quote}It seems like good feature request to include the client logs.{quote}
Implementing this may not be as trivial as just reading another log file 
though. As [~fhueske] commented:
{quote}I assume that these logs are generated from a different process, i.e., 
the client process and not the JM or TM process.
Hence, they end up in a different log file and are not covered by the log 
collection of the UI.
The reason is that *this process might also be run on a machine outside of the 
cluster. So the client log file might not be accessible from the UI process*. 
{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9334) Docs to have a code snippet of Kafka partition discovery

2018-05-11 Thread Juho Autio (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juho Autio updated FLINK-9334:
--
Description: 
Tzu-Li (Gordon) said:
{quote}Yes, it might be helpful to have a code snippet to demonstrate the 
configuration for partition discovery.
{quote}
 

*Background*
  
 The docs correctly say:
  
{quote}To enable it, set a non-negative value for 
+flink.partition-discovery.interval-millis+ in the _provided properties config_
{quote}
 
 So it should be set in the Properties that are passed in the constructor of 
FlinkKafkaConsumer.
  
 I had somehow assumed that this should go to flink-conf.yaml (maybe because it 
starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't read that.
  
 A piece of example code might've helped me avoid this mistake. 
  
 This was discussed on the user mailing list:
 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Consumers-Partition-Discovery-doesn-t-work-tp19129p19484.html]

  was:
Tzu-Li (Gordon) said:
{quote}
Yes, it might be helpful to have a code snippet to demonstrate the 
configuration for partition discovery.
{quote}
 
 
The docs correctly say:
 
{quote}
To enable it, set a non-negative value for 
+flink.partition-discovery.interval-millis+ in the _provided properties config_
{quote}
 
So it should be set in the Properties that are passed in the constructor of 
FlinkKafkaConsumer.
 
I had somehow assumed that this should go to flink-conf.yaml (maybe because it 
starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't read that.
 
A piece of example code might've helped me avoid this mistake.
 
 
This was discussed on the user mailing list:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Consumers-Partition-Discovery-doesn-t-work-tp19129p19484.html


> Docs to have a code snippet of Kafka partition discovery
> 
>
> Key: FLINK-9334
> URL: https://issues.apache.org/jira/browse/FLINK-9334
> Project: Flink
>  Issue Type: Improvement
>Reporter: Juho Autio
>Priority: Major
>
> Tzu-Li (Gordon) said:
> {quote}Yes, it might be helpful to have a code snippet to demonstrate the 
> configuration for partition discovery.
> {quote}
>  
> 
> *Background*
>   
>  The docs correctly say:
>   
> {quote}To enable it, set a non-negative value for 
> +flink.partition-discovery.interval-millis+ in the _provided properties 
> config_
> {quote}
>  
>  So it should be set in the Properties that are passed in the constructor of 
> FlinkKafkaConsumer.
>   
>  I had somehow assumed that this should go to flink-conf.yaml (maybe because 
> it starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't read 
> that.
>   
>  A piece of example code might've helped me avoid this mistake. 
>   
>  This was discussed on the user mailing list:
>  
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Consumers-Partition-Discovery-doesn-t-work-tp19129p19484.html]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9334) Docs to have a code snippet of Kafka partition discovery

2018-05-11 Thread Juho Autio (JIRA)
Juho Autio created FLINK-9334:
-

 Summary: Docs to have a code snippet of Kafka partition discovery
 Key: FLINK-9334
 URL: https://issues.apache.org/jira/browse/FLINK-9334
 Project: Flink
  Issue Type: Improvement
Reporter: Juho Autio


Tzu-Li (Gordon) said:
{quote}
Yes, it might be helpful to have a code snippet to demonstrate the 
configuration for partition discovery.
{quote}
 
 
The docs correctly say:
 
{quote}
To enable it, set a non-negative value for 
+flink.partition-discovery.interval-millis+ in the _provided properties config_
{quote}
 
So it should be set in the Properties that are passed in the constructor of 
FlinkKafkaConsumer.
 
I had somehow assumed that this should go to flink-conf.yaml (maybe because it 
starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't read that.
 
A piece of example code might've helped me avoid this mistake.
 
 
This was discussed on the user mailing list:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Consumers-Partition-Discovery-doesn-t-work-tp19129p19484.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9184) Remove warning from kafka consumer docs

2018-04-16 Thread Juho Autio (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juho Autio updated FLINK-9184:
--
Description: Once the main problem of FLINK-5479 has been fixed, remove the 
warning about idle partitions from the docs.  (was: Once the main problem of 
FLINK-5479 has been fixed, remove the warning about idle from the docs.)

> Remove warning from kafka consumer docs
> ---
>
> Key: FLINK-9184
> URL: https://issues.apache.org/jira/browse/FLINK-9184
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Juho Autio
>Priority: Major
>
> Once the main problem of FLINK-5479 has been fixed, remove the warning about 
> idle partitions from the docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9184) Remove warning from kafka consumer docs

2018-04-16 Thread Juho Autio (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juho Autio updated FLINK-9184:
--
Description: Once the main problem (FLINK-5479) has been fixed, remove the 
warning about idle partitions from the docs.  (was: Once the main problem of 
FLINK-5479 has been fixed, remove the warning about idle partitions from the 
docs.)

> Remove warning from kafka consumer docs
> ---
>
> Key: FLINK-9184
> URL: https://issues.apache.org/jira/browse/FLINK-9184
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Juho Autio
>Priority: Major
>
> Once the main problem (FLINK-5479) has been fixed, remove the warning about 
> idle partitions from the docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9184) Remove warning from kafka consumer docs

2018-04-16 Thread Juho Autio (JIRA)
Juho Autio created FLINK-9184:
-

 Summary: Remove warning from kafka consumer docs
 Key: FLINK-9184
 URL: https://issues.apache.org/jira/browse/FLINK-9184
 Project: Flink
  Issue Type: Sub-task
Reporter: Juho Autio


Once the main problem of FLINK-5479 has been fixed, remove the warning about 
idle from the docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9183) Kafka consumer docs to warn about idle partitions

2018-04-16 Thread Juho Autio (JIRA)
Juho Autio created FLINK-9183:
-

 Summary: Kafka consumer docs to warn about idle partitions
 Key: FLINK-9183
 URL: https://issues.apache.org/jira/browse/FLINK-9183
 Project: Flink
  Issue Type: Sub-task
Reporter: Juho Autio


Looks like the bug FLINK-5479 is entirely preventing 
FlinkKafkaConsumerBase#assignTimestampsAndWatermarks to be used if there are 
any idle partitions. It would be nice to mention in documentation that 
currently this requires all subscribed partitions to have a constant stream of 
data with growing timestamps. When watermark gets stalled on an idle partition 
it blocks everything.
 
Link to current documentation:
[https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-04-12 Thread Juho Autio (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16435048#comment-16435048
 ] 

Juho Autio commented on FLINK-5479:
---

[~tzulitai]:
{quote}Raised this to blocker for 1.5.0, as we are seeing more users with this 
issue on the mailing lists.
{quote}
Is a fix for this going to make it to 1.5.0, or has it been postponed to 1.6.0? 
This is a big problem for us in a job where we need to consume many topics with 
a catch-all topic name pattern, and many topics/partitions can remain idle for 
a long time – even forever.

> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3123) Allow setting custom start-offsets for the Kafka consumer

2016-11-11 Thread Juho Autio (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15657244#comment-15657244
 ] 

Juho Autio commented on FLINK-3123:
---

Nice one, looking forward to this!

{quote}
If a subscribed partition is not defined a specific offset (does not have a 
corresponding entry in the specificStartOffsets map), then the startup 
behaviour for that particular partition fallbacks to the default group offset 
behaviour (look for offset in ZK / Kafka for that partition, or use 
"auto.offset.reset" if none can be found).
{quote}

My use case is skipping to the latest offsets while keeping the same consumer 
group id. What happens if I define all topics & partitions with offset -1? 
Because that's the special value for fetching the latest offset (and -2 is 
earliest). I wonder if there's some code that translates -1 to 0 or -2, which 
would be a problem.

Even better in my case would be that I can just say: 
consumer.setStartFromOffset(-1) which would automatically discover all 
partitions for all topics and initialize the offsets by fetching from Kafka 
with "current offset" -1. It would be nice if this reset the offsets even when 
a Flink snapshot state exists with some offset positions (I suppose that's how 
your setStartFromSpecificOffsets works any way).

> Allow setting custom start-offsets for the Kafka consumer
> -
>
> Key: FLINK-3123
> URL: https://issues.apache.org/jira/browse/FLINK-3123
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.0.0, 1.2.0
>
>
> Currently, the Kafka consumer only allows to start reading from the earliest 
> available offset or the current offset.
> Sometimes, users want to set a specific start offset themselves.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3964) Job submission times out with recursive.file.enumeration

2016-06-23 Thread Juho Autio (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346263#comment-15346263
 ] 

Juho Autio commented on FLINK-3964:
---

Sorry I didn't realize that {{-m yarn-cluster}} was significant here – I also 
had that flag.

I'm a bit concerned that there won't be an easy way to find a good minimum 
value for the timeout. I would rather set a timeout for configuring a single 
vertice during submission, if that makes sense. This would mean that if 
configuring a single vertice takes too long, we get a timeout, but on the other 
hand if we have a job that spawns many vertices, it will be able to configure 
all of them without timing out (as long as configuring each vertice doesn't 
time out). What do you think about that?

Also if this happens, the error message could give a hint to increase 
akka.client.timeout (or should it just suggest Dakka.ask.timeout?) and print 
the current value. It would make this much easier to fix if it happens.

> Job submission times out with recursive.file.enumeration
> 
>
> Key: FLINK-3964
> URL: https://issues.apache.org/jira/browse/FLINK-3964
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, DataSet API
>Affects Versions: 1.0.0
>Reporter: Juho Autio
>
> When using "recursive.file.enumeration" with a big enough folder structure to 
> list, flink batch job fails right at the beginning because of a timeout.
> h2. Problem details
> We get this error: {{Communication with JobManager failed: Job submission to 
> the JobManager timed out}}.
> The code we have is basically this:
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val parameters = new Configuration
> // set the recursive enumeration parameter
> parameters.setBoolean("recursive.file.enumeration", true)
> val parameter = ParameterTool.fromArgs(args)
> val input_data_path : String = parameter.get("input_data_path", null )
> val data : DataSet[(Text,Text)] = env.readSequenceFile(classOf[Text], 
> classOf[Text], input_data_path)
> .withParameters(parameters)
> data.first(10).print
> {code}
> If we set {{input_data_path}} parameter to {{s3n://bucket/path/date=*/}} it 
> times out. If we use a more restrictive pattern like 
> {{s3n://bucket/path/date=20160523/}}, it doesn't time out.
> To me it seems that time taken to list files shouldn't cause any timeouts on 
> job submission level.
> For us this was "fixed" by adding {{akka.client.timeout: 600 s}} in 
> {{flink-conf.yaml}}, but I wonder if the timeout would still occur if we have 
> even more files to list?
> 
> P.S. Is there any way to set {{akka.client.timeout}} when calling {{bin/flink 
> run}} instead of editing {{flink-conf.yaml}}. I tried to add it as a {{-yD}} 
> flag but couldn't get it working.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3964) Job submission times out with recursive.file.enumeration

2016-06-21 Thread Juho Autio (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341594#comment-15341594
 ] 

Juho Autio commented on FLINK-3964:
---

[~mxm], I'm pretty sure I tried {{bin/flink run -yDakka.client.timeout=600s}} 
and it didn't seem to have any effect. Did you intentionally suggest _ask_ 
timeout instead of _client_ timeout? Do those have different scopes somehow?

And just in general, should there be any difference between providing a 
parameter with {{-yD}} vs. setting it in {{flink-conf.yaml}}? Does it make 
sense that a parameter would not be applied if given with {{-yD}}?

> Job submission times out with recursive.file.enumeration
> 
>
> Key: FLINK-3964
> URL: https://issues.apache.org/jira/browse/FLINK-3964
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, DataSet API
>Affects Versions: 1.0.0
>Reporter: Juho Autio
>
> When using "recursive.file.enumeration" with a big enough folder structure to 
> list, flink batch job fails right at the beginning because of a timeout.
> h2. Problem details
> We get this error: {{Communication with JobManager failed: Job submission to 
> the JobManager timed out}}.
> The code we have is basically this:
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val parameters = new Configuration
> // set the recursive enumeration parameter
> parameters.setBoolean("recursive.file.enumeration", true)
> val parameter = ParameterTool.fromArgs(args)
> val input_data_path : String = parameter.get("input_data_path", null )
> val data : DataSet[(Text,Text)] = env.readSequenceFile(classOf[Text], 
> classOf[Text], input_data_path)
> .withParameters(parameters)
> data.first(10).print
> {code}
> If we set {{input_data_path}} parameter to {{s3n://bucket/path/date=*/}} it 
> times out. If we use a more restrictive pattern like 
> {{s3n://bucket/path/date=20160523/}}, it doesn't time out.
> To me it seems that time taken to list files shouldn't cause any timeouts on 
> job submission level.
> For us this was "fixed" by adding {{akka.client.timeout: 600 s}} in 
> {{flink-conf.yaml}}, but I wonder if the timeout would still occur if we have 
> even more files to list?
> 
> P.S. Is there any way to set {{akka.client.timeout}} when calling {{bin/flink 
> run}} instead of editing {{flink-conf.yaml}}. I tried to add it as a {{-yD}} 
> flag but couldn't get it working.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2672) Add partitioned output format to HDFS RollingSink

2016-05-25 Thread Juho Autio (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299557#comment-15299557
 ] 

Juho Autio commented on FLINK-2672:
---

I wonder what the full pattern syntax would be?

To provide full expressivity I think it would be good to extend RollingSink's 
Bucketer interface to take the tuple as an argument. This would allow 
implementing a custom bucketer to format paths like you describe and more.

> Add partitioned output format to HDFS RollingSink
> -
>
> Key: FLINK-2672
> URL: https://issues.apache.org/jira/browse/FLINK-2672
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 0.10.0
>Reporter: Mohamed Amine ABDESSEMED
>Priority: Minor
>  Labels: features
>
> An interesting use case of the HDFS Sink is to dispatch data into multiple 
> directories depending of attributes present in the source data.
> For example, for some data with a timestamp and a status fields, we want to 
> write it into different directories using a pattern like : 
> /somepath/%{timestamp}/%{status}
> The expected results are somethings like: 
> /somepath/some_timestamp/wellformed
> /somepath/some_timestamp/malformed
> /somepath/some_timestamp/incomplete 
> ... 
> etc
> To support this functionality the bucketing and checkpointing logics need to 
> be changed. 
> Note: For now, this can be done using the current version of the Rolling HDFS 
> Sink (https://github.com/apache/flink/pull/1084) with the help of splitting 
> data streams and having multiple HDFS sinks  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3961) Possibility to write output to multiple locations based on partitions

2016-05-25 Thread Juho Autio (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299554#comment-15299554
 ] 

Juho Autio commented on FLINK-3961:
---

This question is primarily about Flink Batch API.

> Possibility to write output to multiple locations based on  partitions
> --
>
> Key: FLINK-3961
> URL: https://issues.apache.org/jira/browse/FLINK-3961
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.0.2
>Reporter: Kirsti Laurila
>Priority: Minor
>
> At the moment, one cannot write output to different folders based on 
> partitions based on data e.g. if data is partitioned by day, there is no 
> direct way to write data to 
> path/date=20160520/data
> path/date=20160521/data
> ...
> Add support for this, prefereably to all write possibilities (Write, 
> WriteAsCsv) etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3964) Job submission times out with recursive.file.enumeration

2016-05-24 Thread Juho Autio (JIRA)
Juho Autio created FLINK-3964:
-

 Summary: Job submission times out with recursive.file.enumeration
 Key: FLINK-3964
 URL: https://issues.apache.org/jira/browse/FLINK-3964
 Project: Flink
  Issue Type: Bug
Reporter: Juho Autio


When using "recursive.file.enumeration" with a big enough folder structure to 
list, flink batch job fails right at the beginning because of a timeout.

h2. Problem details

We get this error: {{Communication with JobManager failed: Job submission to 
the JobManager timed out}}.

The code we have is basically this:

{code}
val env = ExecutionEnvironment.getExecutionEnvironment

val parameters = new Configuration

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)

val parameter = ParameterTool.fromArgs(args)

val input_data_path : String = parameter.get("input_data_path", null )

val data : DataSet[(Text,Text)] = env.readSequenceFile(classOf[Text], 
classOf[Text], input_data_path)
.withParameters(parameters)

data.first(10).print
{code}

If we set {{input_data_path}} parameter to {{s3n://bucket/path/date=*/}} it 
times out. If we use a more restrictive pattern like 
{{s3n://bucket/path/date=20160523/}}, it doesn't time out.

To me it seems that time taken to list files shouldn't cause any timeouts on 
job submission level.

For us this was "fixed" by adding {{akka.client.timeout: 600 s}} in 
{{flink-conf.yaml}}, but I wonder if the timeout would still occur if we have 
even more files to list?



P.S. Is there any way to set {{akka.client.timeout}} when calling {{bin/flink 
run}} instead of editing {{flink-conf.yaml}}. I tried to add it as a {{-yD}} 
flag but couldn't get it working.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)