[jira] [Commented] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593557#comment-16593557 ] Juho Autio commented on FLINK-10216: [~yanghua] thanks, I had missed it. I wonder if REGEXP_MATCH (or REGEXP_MATCHES?) should still be included for completeness. I think it's common enough for users to be looking for it quite often. > Add REGEXP_MATCH in TableAPI and SQL > > > Key: FLINK-10216 > URL: https://issues.apache.org/jira/browse/FLINK-10216 > Project: Flink > Issue Type: Sub-task >Reporter: Juho Autio >Priority: Major > > Here's a naive implementation: > {code:java} > public class RegexpMatchFunction extends ScalarFunction { > // NOTE! Flink calls eval() by reflection > public boolean eval(String value, String pattern) { > return value != null && pattern != null && value.matches(pattern); > } > } > {code} > I wonder if there would be a way to optimize this to use > {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls > (possibly different values, but same pattern). > h3. Naming > Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: > [https://github.com/apache/flink/pull/6448#issuecomment-415972833] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juho Autio updated FLINK-10216: --- Description: Here's a naive implementation: {code:java} public class RegexpMatchFunction extends ScalarFunction { // NOTE! Flink calls eval() by reflection public boolean eval(String value, String pattern) { return value != null && pattern != null && value.matches(pattern); } } {code} I wonder if there would be a way to optimize this to use {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls (possibly different values, but same pattern). h3. Naming Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: [https://github.com/apache/flink/pull/6448#issuecomment-415972833] was: Here's a naive implementation: {code:java} public class RegexpMatchFunction extends ScalarFunction { // NOTE! Flink calls eval() by reflection public boolean eval(String value, String pattern) { return value != null && pattern != null && value.matches(pattern); } } {code} I wonder if there would be a way to optimize this to use {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls (possibly different values, but same pattern). > Add REGEXP_MATCH in TableAPI and SQL > > > Key: FLINK-10216 > URL: https://issues.apache.org/jira/browse/FLINK-10216 > Project: Flink > Issue Type: Sub-task >Reporter: Juho Autio >Priority: Major > > Here's a naive implementation: > {code:java} > public class RegexpMatchFunction extends ScalarFunction { > // NOTE! Flink calls eval() by reflection > public boolean eval(String value, String pattern) { > return value != null && pattern != null && value.matches(pattern); > } > } > {code} > I wonder if there would be a way to optimize this to use > {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls > (possibly different values, but same pattern). > h3. Naming > Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: > [https://github.com/apache/flink/pull/6448#issuecomment-415972833] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL
Juho Autio created FLINK-10216: -- Summary: Add REGEXP_MATCH in TableAPI and SQL Key: FLINK-10216 URL: https://issues.apache.org/jira/browse/FLINK-10216 Project: Flink Issue Type: Sub-task Reporter: Juho Autio Here's a naive implementation: {code:java} public class RegexpMatchFunction extends ScalarFunction { // NOTE! Flink calls eval() by reflection public boolean eval(String value, String pattern) { return value != null && pattern != null && value.matches(pattern); } } {code} I wonder if there would be a way to optimize this to use {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls (possibly different values, but same pattern). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9335) Expose client logs via Flink UI
[ https://issues.apache.org/jira/browse/FLINK-9335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471734#comment-16471734 ] Juho Autio commented on FLINK-9335: --- I'm submitting jobs on command line with flink run, yarn-cluster mode. > Expose client logs via Flink UI > --- > > Key: FLINK-9335 > URL: https://issues.apache.org/jira/browse/FLINK-9335 > Project: Flink > Issue Type: New Feature >Reporter: Juho Autio >Priority: Minor > > The logs logged by my Flink job jar _before_ *env.execute* can't be found in > jobmanager log in Flink UI. > In my case they seem to be going to > /+home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log,+ > for example. > [~fhueske] said: > {quote}It seems like good feature request to include the client logs. > {quote} > Implementing this may not be as trivial as just reading another log file > though. As [~fhueske] commented: > {quote}I assume that these logs are generated from a different process, i.e., > the client process and not the JM or TM process. > Hence, they end up in a different log file and are not covered by the log > collection of the UI. > The reason is that *this process might also be run on a machine outside of > the cluster. So the client log file might not be accessible from the UI > process*. > {quote} > This was discussed on the user mailing list: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Application-logs-missing-from-jobmanager-log-tp19830p19882.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9335) Expose client logs via Flink UI
[ https://issues.apache.org/jira/browse/FLINK-9335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juho Autio updated FLINK-9335: -- Description: The logs logged by my Flink job jar _before_ *env.execute* can't be found in jobmanager log in Flink UI. In my case they seem to be going to /+home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log,+ for example. [~fhueske] said: {quote}It seems like good feature request to include the client logs. {quote} Implementing this may not be as trivial as just reading another log file though. As [~fhueske] commented: {quote}I assume that these logs are generated from a different process, i.e., the client process and not the JM or TM process. Hence, they end up in a different log file and are not covered by the log collection of the UI. The reason is that *this process might also be run on a machine outside of the cluster. So the client log file might not be accessible from the UI process*. {quote} This was discussed on the user mailing list: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Application-logs-missing-from-jobmanager-log-tp19830p19882.html was: The logs logged by my Flink job jar _before_ *env.execute* can't be found in jobmanager log in Flink UI. In my case they seem to be going to /+home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log,+ for example. [~fhueske] said: {quote}It seems like good feature request to include the client logs.{quote} Implementing this may not be as trivial as just reading another log file though. As [~fhueske] commented: {quote}I assume that these logs are generated from a different process, i.e., the client process and not the JM or TM process. Hence, they end up in a different log file and are not covered by the log collection of the UI. The reason is that *this process might also be run on a machine outside of the cluster. So the client log file might not be accessible from the UI process*. {quote} > Expose client logs via Flink UI > --- > > Key: FLINK-9335 > URL: https://issues.apache.org/jira/browse/FLINK-9335 > Project: Flink > Issue Type: Improvement >Reporter: Juho Autio >Priority: Major > > The logs logged by my Flink job jar _before_ *env.execute* can't be found in > jobmanager log in Flink UI. > In my case they seem to be going to > /+home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log,+ > for example. > [~fhueske] said: > {quote}It seems like good feature request to include the client logs. > {quote} > Implementing this may not be as trivial as just reading another log file > though. As [~fhueske] commented: > {quote}I assume that these logs are generated from a different process, i.e., > the client process and not the JM or TM process. > Hence, they end up in a different log file and are not covered by the log > collection of the UI. > The reason is that *this process might also be run on a machine outside of > the cluster. So the client log file might not be accessible from the UI > process*. > {quote} > This was discussed on the user mailing list: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Application-logs-missing-from-jobmanager-log-tp19830p19882.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9335) Expose client logs via Flink UI
Juho Autio created FLINK-9335: - Summary: Expose client logs via Flink UI Key: FLINK-9335 URL: https://issues.apache.org/jira/browse/FLINK-9335 Project: Flink Issue Type: Improvement Reporter: Juho Autio The logs logged by my Flink job jar _before_ *env.execute* can't be found in jobmanager log in Flink UI. In my case they seem to be going to /+home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log,+ for example. [~fhueske] said: {quote}It seems like good feature request to include the client logs.{quote} Implementing this may not be as trivial as just reading another log file though. As [~fhueske] commented: {quote}I assume that these logs are generated from a different process, i.e., the client process and not the JM or TM process. Hence, they end up in a different log file and are not covered by the log collection of the UI. The reason is that *this process might also be run on a machine outside of the cluster. So the client log file might not be accessible from the UI process*. {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9334) Docs to have a code snippet of Kafka partition discovery
[ https://issues.apache.org/jira/browse/FLINK-9334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juho Autio updated FLINK-9334: -- Description: Tzu-Li (Gordon) said: {quote}Yes, it might be helpful to have a code snippet to demonstrate the configuration for partition discovery. {quote} *Background* The docs correctly say: {quote}To enable it, set a non-negative value for +flink.partition-discovery.interval-millis+ in the _provided properties config_ {quote} So it should be set in the Properties that are passed in the constructor of FlinkKafkaConsumer. I had somehow assumed that this should go to flink-conf.yaml (maybe because it starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't read that. A piece of example code might've helped me avoid this mistake. This was discussed on the user mailing list: [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Consumers-Partition-Discovery-doesn-t-work-tp19129p19484.html] was: Tzu-Li (Gordon) said: {quote} Yes, it might be helpful to have a code snippet to demonstrate the configuration for partition discovery. {quote} The docs correctly say: {quote} To enable it, set a non-negative value for +flink.partition-discovery.interval-millis+ in the _provided properties config_ {quote} So it should be set in the Properties that are passed in the constructor of FlinkKafkaConsumer. I had somehow assumed that this should go to flink-conf.yaml (maybe because it starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't read that. A piece of example code might've helped me avoid this mistake. This was discussed on the user mailing list: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Consumers-Partition-Discovery-doesn-t-work-tp19129p19484.html > Docs to have a code snippet of Kafka partition discovery > > > Key: FLINK-9334 > URL: https://issues.apache.org/jira/browse/FLINK-9334 > Project: Flink > Issue Type: Improvement >Reporter: Juho Autio >Priority: Major > > Tzu-Li (Gordon) said: > {quote}Yes, it might be helpful to have a code snippet to demonstrate the > configuration for partition discovery. > {quote} > > > *Background* > > The docs correctly say: > > {quote}To enable it, set a non-negative value for > +flink.partition-discovery.interval-millis+ in the _provided properties > config_ > {quote} > > So it should be set in the Properties that are passed in the constructor of > FlinkKafkaConsumer. > > I had somehow assumed that this should go to flink-conf.yaml (maybe because > it starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't read > that. > > A piece of example code might've helped me avoid this mistake. > > This was discussed on the user mailing list: > > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Consumers-Partition-Discovery-doesn-t-work-tp19129p19484.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9334) Docs to have a code snippet of Kafka partition discovery
Juho Autio created FLINK-9334: - Summary: Docs to have a code snippet of Kafka partition discovery Key: FLINK-9334 URL: https://issues.apache.org/jira/browse/FLINK-9334 Project: Flink Issue Type: Improvement Reporter: Juho Autio Tzu-Li (Gordon) said: {quote} Yes, it might be helpful to have a code snippet to demonstrate the configuration for partition discovery. {quote} The docs correctly say: {quote} To enable it, set a non-negative value for +flink.partition-discovery.interval-millis+ in the _provided properties config_ {quote} So it should be set in the Properties that are passed in the constructor of FlinkKafkaConsumer. I had somehow assumed that this should go to flink-conf.yaml (maybe because it starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't read that. A piece of example code might've helped me avoid this mistake. This was discussed on the user mailing list: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Consumers-Partition-Discovery-doesn-t-work-tp19129p19484.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9184) Remove warning from kafka consumer docs
[ https://issues.apache.org/jira/browse/FLINK-9184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juho Autio updated FLINK-9184: -- Description: Once the main problem of FLINK-5479 has been fixed, remove the warning about idle partitions from the docs. (was: Once the main problem of FLINK-5479 has been fixed, remove the warning about idle from the docs.) > Remove warning from kafka consumer docs > --- > > Key: FLINK-9184 > URL: https://issues.apache.org/jira/browse/FLINK-9184 > Project: Flink > Issue Type: Sub-task >Reporter: Juho Autio >Priority: Major > > Once the main problem of FLINK-5479 has been fixed, remove the warning about > idle partitions from the docs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9184) Remove warning from kafka consumer docs
[ https://issues.apache.org/jira/browse/FLINK-9184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juho Autio updated FLINK-9184: -- Description: Once the main problem (FLINK-5479) has been fixed, remove the warning about idle partitions from the docs. (was: Once the main problem of FLINK-5479 has been fixed, remove the warning about idle partitions from the docs.) > Remove warning from kafka consumer docs > --- > > Key: FLINK-9184 > URL: https://issues.apache.org/jira/browse/FLINK-9184 > Project: Flink > Issue Type: Sub-task >Reporter: Juho Autio >Priority: Major > > Once the main problem (FLINK-5479) has been fixed, remove the warning about > idle partitions from the docs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9184) Remove warning from kafka consumer docs
Juho Autio created FLINK-9184: - Summary: Remove warning from kafka consumer docs Key: FLINK-9184 URL: https://issues.apache.org/jira/browse/FLINK-9184 Project: Flink Issue Type: Sub-task Reporter: Juho Autio Once the main problem of FLINK-5479 has been fixed, remove the warning about idle from the docs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9183) Kafka consumer docs to warn about idle partitions
Juho Autio created FLINK-9183: - Summary: Kafka consumer docs to warn about idle partitions Key: FLINK-9183 URL: https://issues.apache.org/jira/browse/FLINK-9183 Project: Flink Issue Type: Sub-task Reporter: Juho Autio Looks like the bug FLINK-5479 is entirely preventing FlinkKafkaConsumerBase#assignTimestampsAndWatermarks to be used if there are any idle partitions. It would be nice to mention in documentation that currently this requires all subscribed partitions to have a constant stream of data with growing timestamps. When watermark gets stalled on an idle partition it blocks everything. Link to current documentation: [https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16435048#comment-16435048 ] Juho Autio commented on FLINK-5479: --- [~tzulitai]: {quote}Raised this to blocker for 1.5.0, as we are seeing more users with this issue on the mailing lists. {quote} Is a fix for this going to make it to 1.5.0, or has it been postponed to 1.6.0? This is a big problem for us in a job where we need to consume many topics with a catch-all topic name pattern, and many topics/partitions can remain idle for a long time – even forever. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3123) Allow setting custom start-offsets for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-3123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15657244#comment-15657244 ] Juho Autio commented on FLINK-3123: --- Nice one, looking forward to this! {quote} If a subscribed partition is not defined a specific offset (does not have a corresponding entry in the specificStartOffsets map), then the startup behaviour for that particular partition fallbacks to the default group offset behaviour (look for offset in ZK / Kafka for that partition, or use "auto.offset.reset" if none can be found). {quote} My use case is skipping to the latest offsets while keeping the same consumer group id. What happens if I define all topics & partitions with offset -1? Because that's the special value for fetching the latest offset (and -2 is earliest). I wonder if there's some code that translates -1 to 0 or -2, which would be a problem. Even better in my case would be that I can just say: consumer.setStartFromOffset(-1) which would automatically discover all partitions for all topics and initialize the offsets by fetching from Kafka with "current offset" -1. It would be nice if this reset the offsets even when a Flink snapshot state exists with some offset positions (I suppose that's how your setStartFromSpecificOffsets works any way). > Allow setting custom start-offsets for the Kafka consumer > - > > Key: FLINK-3123 > URL: https://issues.apache.org/jira/browse/FLINK-3123 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.0.0, 1.2.0 > > > Currently, the Kafka consumer only allows to start reading from the earliest > available offset or the current offset. > Sometimes, users want to set a specific start offset themselves. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3964) Job submission times out with recursive.file.enumeration
[ https://issues.apache.org/jira/browse/FLINK-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346263#comment-15346263 ] Juho Autio commented on FLINK-3964: --- Sorry I didn't realize that {{-m yarn-cluster}} was significant here – I also had that flag. I'm a bit concerned that there won't be an easy way to find a good minimum value for the timeout. I would rather set a timeout for configuring a single vertice during submission, if that makes sense. This would mean that if configuring a single vertice takes too long, we get a timeout, but on the other hand if we have a job that spawns many vertices, it will be able to configure all of them without timing out (as long as configuring each vertice doesn't time out). What do you think about that? Also if this happens, the error message could give a hint to increase akka.client.timeout (or should it just suggest Dakka.ask.timeout?) and print the current value. It would make this much easier to fix if it happens. > Job submission times out with recursive.file.enumeration > > > Key: FLINK-3964 > URL: https://issues.apache.org/jira/browse/FLINK-3964 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, DataSet API >Affects Versions: 1.0.0 >Reporter: Juho Autio > > When using "recursive.file.enumeration" with a big enough folder structure to > list, flink batch job fails right at the beginning because of a timeout. > h2. Problem details > We get this error: {{Communication with JobManager failed: Job submission to > the JobManager timed out}}. > The code we have is basically this: > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > val parameters = new Configuration > // set the recursive enumeration parameter > parameters.setBoolean("recursive.file.enumeration", true) > val parameter = ParameterTool.fromArgs(args) > val input_data_path : String = parameter.get("input_data_path", null ) > val data : DataSet[(Text,Text)] = env.readSequenceFile(classOf[Text], > classOf[Text], input_data_path) > .withParameters(parameters) > data.first(10).print > {code} > If we set {{input_data_path}} parameter to {{s3n://bucket/path/date=*/}} it > times out. If we use a more restrictive pattern like > {{s3n://bucket/path/date=20160523/}}, it doesn't time out. > To me it seems that time taken to list files shouldn't cause any timeouts on > job submission level. > For us this was "fixed" by adding {{akka.client.timeout: 600 s}} in > {{flink-conf.yaml}}, but I wonder if the timeout would still occur if we have > even more files to list? > > P.S. Is there any way to set {{akka.client.timeout}} when calling {{bin/flink > run}} instead of editing {{flink-conf.yaml}}. I tried to add it as a {{-yD}} > flag but couldn't get it working. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3964) Job submission times out with recursive.file.enumeration
[ https://issues.apache.org/jira/browse/FLINK-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341594#comment-15341594 ] Juho Autio commented on FLINK-3964: --- [~mxm], I'm pretty sure I tried {{bin/flink run -yDakka.client.timeout=600s}} and it didn't seem to have any effect. Did you intentionally suggest _ask_ timeout instead of _client_ timeout? Do those have different scopes somehow? And just in general, should there be any difference between providing a parameter with {{-yD}} vs. setting it in {{flink-conf.yaml}}? Does it make sense that a parameter would not be applied if given with {{-yD}}? > Job submission times out with recursive.file.enumeration > > > Key: FLINK-3964 > URL: https://issues.apache.org/jira/browse/FLINK-3964 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, DataSet API >Affects Versions: 1.0.0 >Reporter: Juho Autio > > When using "recursive.file.enumeration" with a big enough folder structure to > list, flink batch job fails right at the beginning because of a timeout. > h2. Problem details > We get this error: {{Communication with JobManager failed: Job submission to > the JobManager timed out}}. > The code we have is basically this: > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > val parameters = new Configuration > // set the recursive enumeration parameter > parameters.setBoolean("recursive.file.enumeration", true) > val parameter = ParameterTool.fromArgs(args) > val input_data_path : String = parameter.get("input_data_path", null ) > val data : DataSet[(Text,Text)] = env.readSequenceFile(classOf[Text], > classOf[Text], input_data_path) > .withParameters(parameters) > data.first(10).print > {code} > If we set {{input_data_path}} parameter to {{s3n://bucket/path/date=*/}} it > times out. If we use a more restrictive pattern like > {{s3n://bucket/path/date=20160523/}}, it doesn't time out. > To me it seems that time taken to list files shouldn't cause any timeouts on > job submission level. > For us this was "fixed" by adding {{akka.client.timeout: 600 s}} in > {{flink-conf.yaml}}, but I wonder if the timeout would still occur if we have > even more files to list? > > P.S. Is there any way to set {{akka.client.timeout}} when calling {{bin/flink > run}} instead of editing {{flink-conf.yaml}}. I tried to add it as a {{-yD}} > flag but couldn't get it working. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2672) Add partitioned output format to HDFS RollingSink
[ https://issues.apache.org/jira/browse/FLINK-2672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299557#comment-15299557 ] Juho Autio commented on FLINK-2672: --- I wonder what the full pattern syntax would be? To provide full expressivity I think it would be good to extend RollingSink's Bucketer interface to take the tuple as an argument. This would allow implementing a custom bucketer to format paths like you describe and more. > Add partitioned output format to HDFS RollingSink > - > > Key: FLINK-2672 > URL: https://issues.apache.org/jira/browse/FLINK-2672 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 0.10.0 >Reporter: Mohamed Amine ABDESSEMED >Priority: Minor > Labels: features > > An interesting use case of the HDFS Sink is to dispatch data into multiple > directories depending of attributes present in the source data. > For example, for some data with a timestamp and a status fields, we want to > write it into different directories using a pattern like : > /somepath/%{timestamp}/%{status} > The expected results are somethings like: > /somepath/some_timestamp/wellformed > /somepath/some_timestamp/malformed > /somepath/some_timestamp/incomplete > ... > etc > To support this functionality the bucketing and checkpointing logics need to > be changed. > Note: For now, this can be done using the current version of the Rolling HDFS > Sink (https://github.com/apache/flink/pull/1084) with the help of splitting > data streams and having multiple HDFS sinks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3961) Possibility to write output to multiple locations based on partitions
[ https://issues.apache.org/jira/browse/FLINK-3961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299554#comment-15299554 ] Juho Autio commented on FLINK-3961: --- This question is primarily about Flink Batch API. > Possibility to write output to multiple locations based on partitions > -- > > Key: FLINK-3961 > URL: https://issues.apache.org/jira/browse/FLINK-3961 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.0.2 >Reporter: Kirsti Laurila >Priority: Minor > > At the moment, one cannot write output to different folders based on > partitions based on data e.g. if data is partitioned by day, there is no > direct way to write data to > path/date=20160520/data > path/date=20160521/data > ... > Add support for this, prefereably to all write possibilities (Write, > WriteAsCsv) etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3964) Job submission times out with recursive.file.enumeration
Juho Autio created FLINK-3964: - Summary: Job submission times out with recursive.file.enumeration Key: FLINK-3964 URL: https://issues.apache.org/jira/browse/FLINK-3964 Project: Flink Issue Type: Bug Reporter: Juho Autio When using "recursive.file.enumeration" with a big enough folder structure to list, flink batch job fails right at the beginning because of a timeout. h2. Problem details We get this error: {{Communication with JobManager failed: Job submission to the JobManager timed out}}. The code we have is basically this: {code} val env = ExecutionEnvironment.getExecutionEnvironment val parameters = new Configuration // set the recursive enumeration parameter parameters.setBoolean("recursive.file.enumeration", true) val parameter = ParameterTool.fromArgs(args) val input_data_path : String = parameter.get("input_data_path", null ) val data : DataSet[(Text,Text)] = env.readSequenceFile(classOf[Text], classOf[Text], input_data_path) .withParameters(parameters) data.first(10).print {code} If we set {{input_data_path}} parameter to {{s3n://bucket/path/date=*/}} it times out. If we use a more restrictive pattern like {{s3n://bucket/path/date=20160523/}}, it doesn't time out. To me it seems that time taken to list files shouldn't cause any timeouts on job submission level. For us this was "fixed" by adding {{akka.client.timeout: 600 s}} in {{flink-conf.yaml}}, but I wonder if the timeout would still occur if we have even more files to list? P.S. Is there any way to set {{akka.client.timeout}} when calling {{bin/flink run}} instead of editing {{flink-conf.yaml}}. I tried to add it as a {{-yD}} flag but couldn't get it working. -- This message was sent by Atlassian JIRA (v6.3.4#6332)