[jira] [Commented] (FLINK-17088) Search TaskManager log content based on keywords function
[ https://issues.apache.org/jira/browse/FLINK-17088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118246#comment-17118246 ] ambition commented on FLINK-17088: -- [~azagrebin] First, Ctrl + F adds additional keyboard command operations for users. Second, only when all the logs are loaded in the browser, can the Ctrl + F operation achieve the search effect. However, the loading process of the browser logs is time-consuming and users need to wait. > Search TaskManager log content based on keywords function > - > > Key: FLINK-17088 > URL: https://issues.apache.org/jira/browse/FLINK-17088 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST, Runtime / Web Frontend >Affects Versions: 1.10.0 >Reporter: pingle wang >Priority: Minor > Labels: pull-request-available > Fix For: 1.11.0 > > Attachments: image-2020-04-11-01-30-09-551.png, > image-2020-04-11-01-30-21-681.png, image-2020-04-11-01-30-38-046.png > > Time Spent: 10m > Remaining Estimate: 0h > > Usually the taskmanager log file has a lot of content. There are several ways > to search for content keywords: > The first one: in browser ctrl + f > The second type: use shell command operations on the server ip, such as grep > -A > The third kind: Log files are connected to Elasticsearch through Logstash / > Filebeat > These all increase the user's operation, which brings inconvenience, so the > search function is developed to query the log content. > Description: > 1. web input box word means search keyword, the default value is empty > 2. The input box lines indicates how many lines follow the keyword, > similar to 'grep -A 10 Exception', lines default value is 0. > 3. If the search returns empty, display as 'No content matching the search > keywords' > > Function display: > !image-2020-04-11-01-30-09-551.png! > !image-2020-04-11-01-30-21-681.png! > !image-2020-04-11-01-30-38-046.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-11487) Support for writing data to Apache Flume
[ https://issues.apache.org/jira/browse/FLINK-11487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition closed FLINK-11487. Resolution: Fixed Release Note: [FLINK-4446] Remove Flume connector (now in Bahir) > Support for writing data to Apache Flume > > > Key: FLINK-11487 > URL: https://issues.apache.org/jira/browse/FLINK-11487 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.7.1 > Environment: JDK 1.8 > Scala 2.11 > Flink 1.7.1 > Apache Flume 1.6.0 >Reporter: ambition >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Flume is a distributed, reliable, and available service for efficiently > collecting, aggregating, and moving large amounts of data, has many users. > Unfortunately, Flink does not currently support with data to Flume. > The following is the official website of flume and github source address: > [Apache Flume website|http://flume.apache.org/index.html] > [Apache Flume github|https://github.com/apache/flume] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11487) Support for writing data to Apache Flume
ambition created FLINK-11487: Summary: Support for writing data to Apache Flume Key: FLINK-11487 URL: https://issues.apache.org/jira/browse/FLINK-11487 Project: Flink Issue Type: New Feature Components: Streaming Connectors Affects Versions: 1.7.1 Environment: JDK 1.8 Scala 2.11 Flink 1.7.1 Apache Flume 1.6.0 Reporter: ambition Fix For: 1.7.2, 1.8.0 Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of data, has many users. Unfortunately, Flink does not currently support with data to Flume. The following is the official website of flume and github source address: [Apache Flume website|http://flume.apache.org/index.html] [Apache Flume github|https://github.com/apache/flume] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11206) sql statement parser enhancement
[ https://issues.apache.org/jira/browse/FLINK-11206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725707#comment-16725707 ] ambition commented on FLINK-11206: -- sorry, I just see this doc https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit# > sql statement parser enhancement > > > Key: FLINK-11206 > URL: https://issues.apache.org/jira/browse/FLINK-11206 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0 >Reporter: ambition >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > change calcite-core to calcite-server, use SqlDdlParserImpl to parse sql. > SqlDdlParserImpl extends its grammar on the basis of SqlParser, which > supports much more grammar than SqlParser, such as: > {code:java} > create or drop schema; > create or drop table; > create or drop view; > ...{code} > favorable for subsequent expansion and enrichment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11206) sql statement parser enhancement
[ https://issues.apache.org/jira/browse/FLINK-11206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition closed FLINK-11206. Resolution: Fixed > sql statement parser enhancement > > > Key: FLINK-11206 > URL: https://issues.apache.org/jira/browse/FLINK-11206 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0 >Reporter: ambition >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > change calcite-core to calcite-server, use SqlDdlParserImpl to parse sql. > SqlDdlParserImpl extends its grammar on the basis of SqlParser, which > supports much more grammar than SqlParser, such as: > {code:java} > create or drop schema; > create or drop table; > create or drop view; > ...{code} > favorable for subsequent expansion and enrichment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11206) sql statement parser enhancement
ambition created FLINK-11206: Summary: sql statement parser enhancement Key: FLINK-11206 URL: https://issues.apache.org/jira/browse/FLINK-11206 Project: Flink Issue Type: Improvement Components: Table API SQL Affects Versions: 1.7.0, 1.6.2 Reporter: ambition change calcite-core to calcite-server, use SqlDdlParserImpl to parse sql. SqlDdlParserImpl extends its grammar on the basis of SqlParser, which supports much more grammar than SqlParser, such as: {code:java} create or drop schema; create or drop table; create or drop view; ...{code} favorable for subsequent expansion and enrichment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11039) LogicalExchange and HashPartitioner realization
[ https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition closed FLINK-11039. Resolution: Fixed > LogicalExchange and HashPartitioner realization > --- > > Key: FLINK-11039 > URL: https://issues.apache.org/jira/browse/FLINK-11039 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0 >Reporter: ambition >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function > realization. > FlinkLogicalExchange realization > org.apache.calcite.rel.logical.LogicalExchange. > HashPartitioner is Partitioner that implements hash-based partitioning using > Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type > HASH_DISTRIBUTED -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11039) LogicalExchange and HashPartitioner realization
[ https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition updated FLINK-11039: - Description: RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function realization. FlinkLogicalExchange realization org.apache.calcite.rel.logical.LogicalExchange. HashPartitioner is Partitioner that implements hash-based partitioning using Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type HASH_DISTRIBUTED was: FlinkLogicalExchange realization org.apache.calcite.rel.logical.LogicalExchange. HashPartitioner is Partitioner that implements hash-based partitioning using Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type HASH_DISTRIBUTED > LogicalExchange and HashPartitioner realization > --- > > Key: FLINK-11039 > URL: https://issues.apache.org/jira/browse/FLINK-11039 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0 >Reporter: ambition >Priority: Minor > Fix For: 1.7.0 > > > RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function > realization. > FlinkLogicalExchange realization > org.apache.calcite.rel.logical.LogicalExchange. > HashPartitioner is Partitioner that implements hash-based partitioning using > Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type > HASH_DISTRIBUTED -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11039) LogicalExchange and HashPartitioner realization
[ https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706717#comment-16706717 ] ambition commented on FLINK-11039: -- It was originally implemented as RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function. But this function cannot be used directly in SQL statements, any distributed engine has Shuffle/Hash/Range/Broadcast operator to shuffle their data with other deamons in the runtime system. so this function helps to implement changes to these operations. > LogicalExchange and HashPartitioner realization > --- > > Key: FLINK-11039 > URL: https://issues.apache.org/jira/browse/FLINK-11039 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0 >Reporter: ambition >Priority: Minor > Fix For: 1.7.0 > > > FlinkLogicalExchange realization > org.apache.calcite.rel.logical.LogicalExchange. > HashPartitioner is Partitioner that implements hash-based partitioning using > Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type > HASH_DISTRIBUTED -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11039) LogicalExchange and HashPartitioner realization
[ https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16705710#comment-16705710 ] ambition commented on FLINK-11039: -- pull request links [PR7202|https://github.com/apache/flink/pull/7202] > LogicalExchange and HashPartitioner realization > --- > > Key: FLINK-11039 > URL: https://issues.apache.org/jira/browse/FLINK-11039 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0 >Reporter: ambition >Priority: Minor > Fix For: 1.7.0 > > > FlinkLogicalExchange realization > org.apache.calcite.rel.logical.LogicalExchange. > HashPartitioner is Partitioner that implements hash-based partitioning using > Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type > HASH_DISTRIBUTED -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11039) LogicalExchange and HashPartitioner realization
ambition created FLINK-11039: Summary: LogicalExchange and HashPartitioner realization Key: FLINK-11039 URL: https://issues.apache.org/jira/browse/FLINK-11039 Project: Flink Issue Type: Improvement Components: Table API SQL Affects Versions: 1.7.0, 1.6.2 Reporter: ambition Fix For: 1.7.0 FlinkLogicalExchange realization org.apache.calcite.rel.logical.LogicalExchange. HashPartitioner is Partitioner that implements hash-based partitioning using Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type HASH_DISTRIBUTED -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10299) RowSerializer.copy data value cast exception and use checkpoint function Lead to Could not restart this job
[ https://issues.apache.org/jira/browse/FLINK-10299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition updated FLINK-10299: - Attachment: image-2018-11-19-10-40-14-577.png > RowSerializer.copy data value cast exception and use checkpoint function Lead > to Could not restart this job > --- > > Key: FLINK-10299 > URL: https://issues.apache.org/jira/browse/FLINK-10299 > Project: Flink > Issue Type: Bug > Components: Core, Table API SQL >Affects Versions: 1.6.0 >Reporter: ambition >Priority: Minor > Attachments: image-2018-09-07-17-47-04-343.png, > image-2018-11-19-10-40-14-577.png > > > Flink sql deal with User behavior data collection, such as: > {code:java} > { > "event_id": "session_start", > "timestamp": "-", // error data, > "viewport_height": "667", > "viewport_width": "-" //error data > } > {code} > Causing exception info : > {code:java} > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - > Could not restart the job Flink Streaming Job > (6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it. > java.lang.ClassCastException: java.lang.String cannot be cast to > java.sql.Timestamp > at > org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > checkpoint.CheckpointCoordinator (CheckpointCoordinator.java:shutdown(320)) - > Stopping checkpoint coordinator for job 6f0248219c631158f6e38f2dca0beb91. > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > checkpoint.StandaloneCompletedCheckpointStore > (StandaloneCompletedCheckpointStore.java:shutdown(102)) - Shutting down > {code} > Use Flink checkpoint function and Uncatch exception lead to Could not > restart this job, so just error data happen exception set null, like under > image.hope flink commiter provide better solution。 > !image-2018-09-07-17-47-04-343.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition closed FLINK-10818. Resolution: Fixed > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log, taskmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition reopened FLINK-10818: -- > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log, taskmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684814#comment-16684814 ] ambition edited comment on FLINK-10818 at 11/13/18 7:37 AM: logs file : [^jobmanager.log] ^[^taskmanager.log]^ was (Author: ambition): [^jobmanager.log] ^[^taskmanager.log]^ > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log, taskmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684814#comment-16684814 ] ambition edited comment on FLINK-10818 at 11/13/18 7:36 AM: [^jobmanager.log] ^[^taskmanager.log]^ was (Author: ambition): [^jobmanager.log][^taskmanager.log] > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log, taskmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684814#comment-16684814 ] ambition commented on FLINK-10818: -- [^jobmanager.log][^taskmanager.log] > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
***UNCHECKED*** [jira] [Updated] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition updated FLINK-10818: - Attachment: jobmanager.log > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log, taskmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
[ https://issues.apache.org/jira/browse/FLINK-10818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition updated FLINK-10818: - Attachment: taskmanager.log > RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not > enough free slots available to run the job. > > > Key: FLINK-10818 > URL: https://issues.apache.org/jira/browse/FLINK-10818 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 > Environment: JDK 1.8 > Flink 1.6.0 > Hadoop 2.7.3 >Reporter: ambition >Priority: Major > Attachments: image-2018-11-12-11-05-38-159.png, > image-2018-11-12-11-06-33-387.png, image-2018-11-12-11-08-13-572.png, > jobmanager.log, taskmanager.log > > > Our Online Flink on Yarn environment operation job,code set restart tactic > like > {code:java} > exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); > {code} > But job running some days, Occur Exception is : > {code:java} > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource > -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, > _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), > OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), > =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS > NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, > _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', > os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) > AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < > cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < > 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number > of instances=6, total number of slots=6, available slots=0 >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) >at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) >at > org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) >at > org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown > Source) >at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) >at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) >at > org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) >at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) >at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) >at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) >at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) >at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > {code} > > this Exception happened when the job started. issue links to > https://issues.apache.org/jira/browse/FLINK-4486 > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10818) RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job.
ambition created FLINK-10818: Summary: RestartStrategies.fixedDelayRestart Occur NoResourceAvailableException: Not enough free slots available to run the job. Key: FLINK-10818 URL: https://issues.apache.org/jira/browse/FLINK-10818 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.6.2 Environment: JDK 1.8 Flink 1.6.0 Hadoop 2.7.3 Reporter: ambition Our Online Flink on Yarn environment operation job,code set restart tactic like {code:java} exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,1000l)); {code} But job running some days, Occur Exception is : {code:java} org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #5 (Source: KafkaJsonTableSource -> Map -> where: (AND(OR(=(app_key, _UTF-16LE'C4FAF9CE1569F541'), =(app_key, _UTF-16LE'F5C7F68C7117630B'), =(app_key, _UTF-16LE'57C6FF4B5A064D29')), OR(=(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'ios'), =(LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)), _UTF-16LE'android')), IS NOT NULL(server_id))), select: (MT_Date_Format_Mode(receive_time, _UTF-16LE'MMddHHmm', 10) AS date_p, LOWER(TRIM(FLAG(BOTH), _UTF-16LE' ', os_type)) AS os_type, MT_Date_Format_Mode(receive_time, _UTF-16LE'HHmm', 10) AS date_mm, server_id) (1/6)) @ (unassigned) - [SCHEDULED] > with groupID < cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < 690dbad267a8ff37c8cb5e9dbedd0a6d >. Resources available to scheduler: Number of instances=6, total number of slots=6, available slots=0 at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:281) at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:155) at org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$2(Execution.java:491) at org.apache.flink.runtime.executiongraph.Execution$$Lambda$44/1664178385.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2116) at org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:489) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:521) at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:945) at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:875) at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1262) at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) at org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} this Exception happened when the job started. issue links to https://issues.apache.org/jira/browse/FLINK-4486 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10674) DistinctAccumulator.remove lead to NPE
[ https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16679246#comment-16679246 ] ambition commented on FLINK-10674: -- Our Online Flink on Yarn environment operation job been occured exception, use modify remove function not happened. > DistinctAccumulator.remove lead to NPE > -- > > Key: FLINK-10674 > URL: https://issues.apache.org/jira/browse/FLINK-10674 > Project: Flink > Issue Type: Bug > Components: flink-contrib >Affects Versions: 1.6.1 > Environment: Flink 1.6.0 >Reporter: ambition >Assignee: winifredtang >Priority: Minor > Attachments: image-2018-10-25-14-46-03-373.png > > > Our online Flink Job run about a week,job contain sql : > {code:java} > select `time`, > lower(trim(os_type)) as os_type, > count(distinct feed_id) as feed_total_view > from my_table > group by `time`, lower(trim(os_type)){code} > > then occur NPE: > > {code:java} > java.lang.NullPointerException > at scala.Predef$.Long2long(Predef.scala:363) > at > org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109) > at NonWindowedAggregationHelper$894.retract(Unknown Source) > at > org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124) > at > org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39) > at > org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > {code} > > > View DistinctAccumulator.remove > !image-2018-10-25-14-46-03-373.png! > > this NPE should currentCnt = null lead to, so we simple handle like : > {code:java} > def remove(params: Row): Boolean = { > if(!distinctValueMap.contains(params)){ > true > }else{ > val currentCnt = distinctValueMap.get(params) > // > if (currentCnt == null || currentCnt == 1) { > distinctValueMap.remove(params) > true > } else { > var value = currentCnt - 1L > if(value < 0){ > value = 1 > } > distinctValueMap.put(params, value) > false > } > } > }{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10674) DistinctAccumulator.remove lead to NPE
ambition created FLINK-10674: Summary: DistinctAccumulator.remove lead to NPE Key: FLINK-10674 URL: https://issues.apache.org/jira/browse/FLINK-10674 Project: Flink Issue Type: Bug Components: flink-contrib Affects Versions: 1.6.1 Environment: Flink 1.6.0 Reporter: ambition Attachments: image-2018-10-25-14-46-03-373.png Our online Flink Job run about a week,job contain sql : {code:java} select `time`, lower(trim(os_type)) as os_type, count(distinct feed_id) as feed_total_view from my_table group by `time`, lower(trim(os_type)){code} then occur NPE: {code:java} java.lang.NullPointerException at scala.Predef$.Long2long(Predef.scala:363) at org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109) at NonWindowedAggregationHelper$894.retract(Unknown Source) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) {code} View DistinctAccumulator.remove !image-2018-10-25-14-46-03-373.png! this NPE should currentCnt = null lead to, so we simple handle like : {code:java} def remove(params: Row): Boolean = { if(!distinctValueMap.contains(params)){ true }else{ val currentCnt = distinctValueMap.get(params) // if (currentCnt == null || currentCnt == 1) { distinctValueMap.remove(params) true } else { var value = currentCnt - 1L if(value < 0){ value = 1 } distinctValueMap.put(params, value) false } } }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10299) RowSerializer.copy data value cast exception and use checkpoint function Lead to Could not restart this job
[ https://issues.apache.org/jira/browse/FLINK-10299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608810#comment-16608810 ] ambition edited comment on FLINK-10299 at 9/10/18 12:51 PM: Sorry,The past two days are not workdays. I briefly describe the complete process. Flink consuming Kafka captured user app data and some value is error,like "-". The sample data: {code:java} {"event_id": "10001","uid":"1561529398","timestamp": "1536288421", "viewport_height": "667","viewport_width": "375","language":"zh-CN"} {"event_id": "1002","uid":"1561529398","timestamp": "-", "viewport_height": "667","viewport_width": "375","language":"zh-CN" } {"event_id": "1003","uid":"1561529398","timestamp": "1536288421", "viewport_height": "667","viewport_width": "-" ,"language":"zh-CN"} {code} Flink Job code: {code:java} public class UserDataSQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment(); execEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setCheckpointInterval(Long.valueOf(5000)); execEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); execEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1)); FsStateBackend stateBackend = new FsStateBackend("hdfs:/flink/flink-checkpoints"); execEnv.setStateBackend(stateBackend); StreamTableEnvironment env = StreamTableEnvironment.getTableEnvironment(execEnv); Map schemaMap = new LinkedHashMap<>(); schemaMap.put("event_id","Integer"); schemaMap.put("uid","Long"); schemaMap.put("timestamp","Timestamp"); schemaMap.put("viewport_height","Integer"); schemaMap.put("viewport_width","Integer"); schemaMap.put("language","String"); TableSchema tableSchema = new TableSchema( schemaMap.keySet().toArray(new String[schemaMap.size()]), schemaMap.values().toArray(new TypeInformation[schemaMap.size()]) ); Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers","xxx:9092"); kafkaProps.setProperty("topic","topic"); kafkaProps.setProperty("enable.auto.commit","true"); kafkaProps.setProperty("group.id","flink_group"); Kafka010JsonTableSource kafka010JsonTableSource = new Kafka010JsonTableSource("topic", kafkaProps, tableSchema, tableSchema); kafka010JsonTableSource.setProctimeAttribute("timestamp"); env.registerTableSource("user_data",kafka010JsonTableSource); env.registerTableSink("user_count",new MysqlTableUpsertSink()); env.sqlUpdate("inset into user_count select count(uid) as uv,event_id from user_data group by event_id"); execEnv.execute(); } public static class MysqlTableUpsertSink implements UpsertStreamTableSink { //omit other code } public static class UserData { public Integer event_id; public Long uid; public Timestamp timestamp; public Integer viewport_height; public Integer viewport_width; public String language; //omit other code } {code} Use checkpoint function,if data contains error value, job Shutting down, Could not restart this job. Now have two ways can restart this job: 1. FsStateBackend on hdfs data deleted 2. error value set null, like I provide picture Is the a batter way to record error data without affecting checkpoint function. thanks was (Author: ambition): Sorry,The past two days are not workdays. I briefly describe the complete process. Flink consuming Kafka captured user app data and some value is error,like "-". The sample data: {code:java} {"event_id": "10001","uid":"1561529398","timestamp": "1536288421", "viewport_height": "667","viewport_width": "375","language":"zh-CN"} {"event_id": "1002","uid":"1561529398","timestamp": "-", "viewport_height": "667","viewport_width": "375","language":"zh-CN" } {"event_id": "1003","uid":"1561529398","timestamp": "1536288421", "viewport_height": "667","viewport_width": "-" ,"language":"zh-CN"} {code} Flink Job code: {code:java} public class UserDataSQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment(); execEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setCheckpointInterval(Long.valueOf(5000)); execEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); execEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1)); FsStateBackend stateBackend = new
[jira] [Comment Edited] (FLINK-10299) RowSerializer.copy data value cast exception and use checkpoint function Lead to Could not restart this job
[ https://issues.apache.org/jira/browse/FLINK-10299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608810#comment-16608810 ] ambition edited comment on FLINK-10299 at 9/10/18 7:15 AM: --- Sorry,The past two days are not workdays. I briefly describe the complete process. Flink consuming Kafka captured user app data and some value is error,like "-". The sample data: {code:java} {"event_id": "10001","uid":"1561529398","timestamp": "1536288421", "viewport_height": "667","viewport_width": "375","language":"zh-CN"} {"event_id": "1002","uid":"1561529398","timestamp": "-", "viewport_height": "667","viewport_width": "375","language":"zh-CN" } {"event_id": "1003","uid":"1561529398","timestamp": "1536288421", "viewport_height": "667","viewport_width": "-" ,"language":"zh-CN"} {code} Flink Job code: {code:java} public class UserDataSQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment(); execEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setCheckpointInterval(Long.valueOf(5000)); execEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); execEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1)); FsStateBackend stateBackend = new FsStateBackend("hdfs:/flink/flink-checkpoints"); execEnv.setStateBackend(stateBackend); StreamTableEnvironment env = StreamTableEnvironment.getTableEnvironment(execEnv); Map schemaMap = new LinkedHashMap<>(); schemaMap.put("event_id","Integer"); schemaMap.put("uid","Long"); schemaMap.put("timestamp","Timestamp"); schemaMap.put("viewport_height","Integer"); schemaMap.put("viewport_width","Integer"); schemaMap.put("language","String"); TableSchema tableSchema = new TableSchema( schemaMap.keySet().toArray(new String[schemaMap.size()]), schemaMap.values().toArray(new TypeInformation[schemaMap.size()]) ); Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers","xxx:9092"); kafkaProps.setProperty("topic","topic"); kafkaProps.setProperty("enable.auto.commit","true"); kafkaProps.setProperty("group.id","flink_group"); Kafka010JsonTableSource kafka010JsonTableSource = new Kafka010JsonTableSource("topic", kafkaProps, tableSchema, tableSchema); kafka010JsonTableSource.setProctimeAttribute("timestamp"); env.registerTableSource("user_data",kafka010JsonTableSource); env.registerTableSink("user_count",new MysqlTableUpsertSink()); env.sqlUpdate("inset into user_count select count(uid) as uv,event_id from user_data group by event_id"); execEnv.execute(); } public static class MysqlTableUpsertSink implements UpsertStreamTableSink { //omit other code } public static class UserData { public Integer event_id; public Long uid; public Timestamp timestamp; public Integer viewport_height; public Integer viewport_width; public String language; //omit other code } {code} Use checkpoint function,if data contains error value, job Shutting down, Could not restart this job. Now have two ways can restart this job: 1. FsStateBackend on hdfs data deleted 2. error value set null, like I provide picture Is the a batter way to record error data without affecting checkpoint function. thanks was (Author: ambition): Sorry,The past two days are not workdays. I briefly describe the complete process. Flink consuming Kafka captured user app data and some value is error,like "-". The sample data: {code:java} {"event_id": "10001","uid":"1561529398","timestamp": "1536288421", "viewport_height": "667","viewport_width": "375","language":"zh-CN"} {"event_id": "1002","uid":"1561529398","timestamp": "-", "viewport_height": "667","viewport_width": "375","language":"zh-CN" } {"event_id": "1003","uid":"1561529398","timestamp": "1536288421", "viewport_height": "667","viewport_width": "-" ,"language":"zh-CN"} {code} Flink Job code: {code:java} public class UserDataSQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment(); execEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setCheckpointInterval(Long.valueOf(5000)); execEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); execEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1)); FsStateBackend stateBackend = new
[jira] [Updated] (FLINK-10299) RowSerializer.copy data value cast exception and use checkpoint function Lead to Could not restart this job
[ https://issues.apache.org/jira/browse/FLINK-10299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition updated FLINK-10299: - Summary: RowSerializer.copy data value cast exception and use checkpoint function Lead to Could not restart this job (was: RowSerializer.copy data value cast exception Lead to Could not restart this job) > RowSerializer.copy data value cast exception and use checkpoint function Lead > to Could not restart this job > --- > > Key: FLINK-10299 > URL: https://issues.apache.org/jira/browse/FLINK-10299 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.0 >Reporter: ambition >Priority: Minor > Attachments: image-2018-09-07-17-47-04-343.png > > > Flink sql deal with User behavior data collection, such as: > {code:java} > { > "event_id": "session_start", > "timestamp": "-", // error data, > "viewport_height": "667", > "viewport_width": "-" //error data > } > {code} > Causing exception info : > {code:java} > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - > Could not restart the job Flink Streaming Job > (6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it. > java.lang.ClassCastException: java.lang.String cannot be cast to > java.sql.Timestamp > at > org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > checkpoint.CheckpointCoordinator (CheckpointCoordinator.java:shutdown(320)) - > Stopping checkpoint coordinator for job 6f0248219c631158f6e38f2dca0beb91. > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > checkpoint.StandaloneCompletedCheckpointStore > (StandaloneCompletedCheckpointStore.java:shutdown(102)) - Shutting down > {code} > Use Flink checkpoint function and Uncatch exception lead to Could not > restart this job, so just error data happen exception set null, like under > image.hope flink commiter provide better solution。 > !image-2018-09-07-17-47-04-343.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10299) RowSerializer.copy data value cast exception Lead to Could not restart this job
[ https://issues.apache.org/jira/browse/FLINK-10299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608810#comment-16608810 ] ambition commented on FLINK-10299: -- Sorry,The past two days are not workdays. I briefly describe the complete process. Flink consuming Kafka captured user app data and some value is error,like "-". The sample data: {code:java} {"event_id": "10001","uid":"1561529398","timestamp": "1536288421", "viewport_height": "667","viewport_width": "375","language":"zh-CN"} {"event_id": "1002","uid":"1561529398","timestamp": "-", "viewport_height": "667","viewport_width": "375","language":"zh-CN" } {"event_id": "1003","uid":"1561529398","timestamp": "1536288421", "viewport_height": "667","viewport_width": "-" ,"language":"zh-CN"} {code} Flink Job code: {code:java} public class UserDataSQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment(); execEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setCheckpointInterval(Long.valueOf(5000)); execEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); execEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1)); FsStateBackend stateBackend = new FsStateBackend("hdfs:/flink/flink-checkpoints"); execEnv.setStateBackend(stateBackend); StreamTableEnvironment env = StreamTableEnvironment.getTableEnvironment(execEnv); Map schemaMap = new LinkedHashMap<>(); schemaMap.put("event_id","Integer"); schemaMap.put("uid","uid"); schemaMap.put("timestamp","Timestamp"); schemaMap.put("viewport_height","Integer"); schemaMap.put("viewport_width","Integer"); schemaMap.put("language","String"); TableSchema tableSchema = new TableSchema( schemaMap.keySet().toArray(new String[schemaMap.size()]), schemaMap.values().toArray(new TypeInformation[schemaMap.size()]) ); Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers","xxx:9092"); kafkaProps.setProperty("topic","topic"); kafkaProps.setProperty("enable.auto.commit","true"); kafkaProps.setProperty("group.id","flink_group"); Kafka010JsonTableSource kafka010JsonTableSource = new Kafka010JsonTableSource("topic", kafkaProps, tableSchema, tableSchema); kafka010JsonTableSource.setProctimeAttribute("timestamp"); env.registerTableSource("user_data",kafka010JsonTableSource); env.registerTableSink("user_count",new MysqlTableUpsertSink()); env.sqlUpdate("inset into user_count select count(uid) as uv,event_id from user_data group by event_id"); execEnv.execute(); } public static class MysqlTableUpsertSink implements UpsertStreamTableSink { //omit other code } public static class UserData { public Integer event_id; public Long uid; public Timestamp timestamp; public Integer viewport_height; public Integer viewport_width; public String language; //omit other code } {code} Use checkpoint function,if data contains error value, job Shutting down, Could not restart this job. Now have two ways can restart this job: 1. FsStateBackend on hdfs data deleted 2. error value set null, like I provide picture Is the a batter way to record error data without affecting checkpoint function. thanks > RowSerializer.copy data value cast exception Lead to Could not restart this > job > --- > > Key: FLINK-10299 > URL: https://issues.apache.org/jira/browse/FLINK-10299 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.0 >Reporter: ambition >Priority: Minor > Attachments: image-2018-09-07-17-47-04-343.png > > > Flink sql deal with User behavior data collection, such as: > {code:java} > { > "event_id": "session_start", > "timestamp": "-", // error data, > "viewport_height": "667", > "viewport_width": "-" //error data > } > {code} > Causing exception info : > {code:java} > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - > Could not restart the job Flink Streaming Job > (6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it. > java.lang.ClassCastException: java.lang.String cannot be cast to > java.sql.Timestamp > at > org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27) > at >
[jira] [Updated] (FLINK-10299) RowSerializer.copy data value cast exception Lead to Could not restart this job
[ https://issues.apache.org/jira/browse/FLINK-10299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition updated FLINK-10299: - Summary: RowSerializer.copy data value cast exception Lead to Could not restart this job (was: RowSerializer.copy data value cast exception) > RowSerializer.copy data value cast exception Lead to Could not restart this > job > --- > > Key: FLINK-10299 > URL: https://issues.apache.org/jira/browse/FLINK-10299 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.0 >Reporter: ambition >Priority: Minor > Attachments: image-2018-09-07-17-47-04-343.png > > > Flink sql deal with User behavior data collection, such as: > {code:java} > { > "event_id": "session_start", > "timestamp": "-", // error data, > "viewport_height": "667", > "viewport_width": "-" //error data > } > {code} > Causing exception info : > {code:java} > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - > Could not restart the job Flink Streaming Job > (6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it. > java.lang.ClassCastException: java.lang.String cannot be cast to > java.sql.Timestamp > at > org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > checkpoint.CheckpointCoordinator (CheckpointCoordinator.java:shutdown(320)) - > Stopping checkpoint coordinator for job 6f0248219c631158f6e38f2dca0beb91. > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > checkpoint.StandaloneCompletedCheckpointStore > (StandaloneCompletedCheckpointStore.java:shutdown(102)) - Shutting down > {code} > Use Flink checkpoint function and Uncatch exception lead to Could not > restart this job, so just error data happen exception set null, like under > image.hope flink commiter provide better solution。 > !image-2018-09-07-17-47-04-343.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10299) RowSerializer.copy data value cast exception
[ https://issues.apache.org/jira/browse/FLINK-10299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ambition updated FLINK-10299: - Description: Flink sql deal with User behavior data collection, such as: {code:java} { "event_id": "session_start", "timestamp": "-", // error data, "viewport_height": "667", "viewport_width": "-" //error data } {code} Causing exception info : {code:java} 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - Could not restart the job Flink Streaming Job (6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it. java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp at org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO checkpoint.CheckpointCoordinator (CheckpointCoordinator.java:shutdown(320)) - Stopping checkpoint coordinator for job 6f0248219c631158f6e38f2dca0beb91. 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO checkpoint.StandaloneCompletedCheckpointStore (StandaloneCompletedCheckpointStore.java:shutdown(102)) - Shutting down {code} Use Flink checkpoint function and Uncatch exception lead to Could not restart this job, so just error data happen exception set null, like under image.hope flink commiter provide better solution。 !image-2018-09-07-17-47-04-343.png! was: Flink sql deal with User behavior data collection, such as: {code:java} { "event_id": "session_start", "timestamp": "-", // error data, "viewport_height": "667", "viewport_width": "-" //error data } {code} Causing exception info : {code:java} 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - Could not restart the job Flink Streaming Job (6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it. java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp at org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at
[jira] [Created] (FLINK-10299) RowSerializer.copy data value cast exception
ambition created FLINK-10299: Summary: RowSerializer.copy data value cast exception Key: FLINK-10299 URL: https://issues.apache.org/jira/browse/FLINK-10299 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.6.0 Reporter: ambition Attachments: image-2018-09-07-17-47-04-343.png Flink sql deal with User behavior data collection, such as: {code:java} { "event_id": "session_start", "timestamp": "-", // error data, "viewport_height": "667", "viewport_width": "-" //error data } {code} Causing exception info : {code:java} 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - Could not restart the job Flink Streaming Job (6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it. java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp at org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO checkpoint.CheckpointCoordinator (CheckpointCoordinator.java:shutdown(320)) - Stopping checkpoint coordinator for job 6f0248219c631158f6e38f2dca0beb91. 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO checkpoint.StandaloneCompletedCheckpointStore (StandaloneCompletedCheckpointStore.java:shutdown(102)) - Shutting down {code} we use Flink checkpoint function and Uncatch exception lead to Could not restart this job, so we just simple ,hope flink commiter provide better solution。 !image-2018-09-07-17-47-04-343.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)