Re: flink app crashed
Hi Rainie, I am relatively new to flink, but I suspect that your error is somewhere else in the log. I have found most of my problems by doing a search for the word “error” or “exception”. Since all of these log lines are at the info level, they are probably not highlighting any real issues. If you send more of the log or find an error line that might help others debug. Thanks, Jesse From: Rainie Li Date: Wednesday, July 15, 2020 at 10:54 AM To: "user@flink.apache.org" Subject: flink app crashed Hi All, I am new to Flink, any idea why flink app's Job Manager stuck, here is bottom part from the Job Manager log. Any suggestion will be appreciated. 2020-07-15 16:49:52,749 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher . 2020-07-15 16:49:52,759 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. 2020-07-15 16:49:52,759 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock. 2020-07-15 16:49:52,762 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}. 2020-07-15 16:49:52,790 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Dispatcher /user/dispatcher was granted leadership with fencing token 2020-07-15 16:49:52,791 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Recovering all persisted jobs. 2020-07-15 16:49:52,931 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm1 2020-07-15 16:49:53,014 INFO org.apache.flink.yarn.YarnResourceManager - Recovered 0 containers from previous attempts ([]). 2020-07-15 16:49:53,018 INFO org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl - Upper bound of the thread pool size is 500 2020-07-15 16:49:53,020 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - yarn.client.max-cached-nodemanagers-proxies : 0 2020-07-15 16:49:53,021 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}. 2020-07-15 16:49:53,042 INFO org.apache.flink.yarn.YarnResourceManager - ResourceManager akka.tcp://flink@cluster-dev-001/user/resourcemanager was granted leadership with fencing token 2020-07-15 16:49:53,046 INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - Starting the SlotManager. 2020-07-15 16:50:52,217 INFO org.apache.kafka.clients.Metadata - Cluster ID: FZrfSqHiTpaZwEzIRYkCLQ Thanks Best regards Rainie
Pyflink sink rowtime field
I am trying to sink the rowtime field in pyflink 1.10. I get the following error For the source schema I use .field("rowtime", DataTypes.TIMESTAMP(2)) .rowtime( Rowtime() .timestamps_from_field("timestamp") .watermarks_periodic_ascending() ) To create the rowtime field and have tried variations on .field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) In the sink schema. Trying all of the different types in DataTypes I get essentially the following error: py4j.protocol.Py4JJavaError: An error occurred while calling o56.insertInto. : org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`output` do not match. Query result schema: [rowtime: LocalDateTime] TableSink schema:[rowtime: Timestamp] I know that in Java there is org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python documentation lists Types.SQL_TIMESTAMP, but I cannot find the corresponding type in the python library. Can anyone help point me to the correct type for the schema? Thanks, Jesse
Pyflink JavaPackage Error
I am trying to read from kafka using pyflink table API on EMR 5.30, Flink 1.10 (and I have reproduced the error with 1.11). I am getting an error the following error using either `flink run -py` or pyflink-shell.sh (the error message below was generated in the pyflink shell): >>> Kafka() Traceback (most recent call last): File "", line 1, in File "/tmp/8772bc6d-f275-4ecf-bdcc-d2c34d910473pyflink.zip/pyflink/table/descriptors.py", line 705, in __init__ TypeError: 'JavaPackage' object is not callable >>> Csv() Traceback (most recent call last): File "", line 1, in File "/tmp/8772bc6d-f275-4ecf-bdcc-d2c34d910473pyflink.zip/pyflink/table/descriptors.py", line 398, in __init__ TypeError: 'JavaPackage' object is not callable >>> Json() Traceback (most recent call last): File "", line 1, in File "/tmp/8772bc6d-f275-4ecf-bdcc-d2c34d910473pyflink.zip/pyflink/table/descriptors.py", line 553, in __init__ TypeError: 'JavaPackage' object is not callable I assume this is telling me I don’t have the flink-kafka connector jar, but I have not been able to figure out how to provide the right jars. I have tried using `flink run -j /path/to/flink-connector-kafka-base_2.11-1.10.0.jar`, `flink run –classpath /path/to/dependency/dir`, `flink run -Dyarn.provided.lib.dirs=hdfs:///flink/kafka/dependencies`. Is there a way to provide the jar dependencies when submitting a python job (or does this error indicate something else)? Thanks, Jesse
Re: Parquet data stream group converter error
I should have mentioned that I was able to read the same file in the batch ParquetTableSource. It is only when reading it in a stream that I encounter this error. - Jesse From: Khachatryan Roman Sent: Friday, July 3, 2020 12:08:51 AM To: Jesse Lord Cc: user@flink.apache.org Subject: Re: Parquet data stream group converter error Hi, > MessageType schema = reader.getFooter().getFileMetaData().getSchema(); The first thing I'd suggest is to verify that the file contains a valid schema and can be read by some other program, e.g. parquet-tools schema or cat [1]. Regards, Roman On Thu, Jul 2, 2020 at 11:36 PM Jesse Lord mailto:jl...@vectra.ai>> wrote: I am trying to read a parquet file into a datastream and then register that stream as a temporary table. This file is created by spark 2.4 in HDFS on AWS EMR. I am using flink version 1.10.0 with EMR 5.30. I am getting the following error: Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351) Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter" at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34) at org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:267) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207) at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327) Below is a snippet of code that shows how I am trying to read the parquet file: String filePath = "hdfs:///path/to/single/file.parquet"; ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(filePath), new Configuration())); MessageType schema = reader.getFooter().getFileMetaData().getSchema(); String parquetPath = "hdfs:///path/to/parquet/directory”; DataStream parquetStream = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(parquetPath), schema), parquetPath); Table parquetTable = tEnv.fromDataStream(parquetStream); tEnv.createTemporaryView("isession", parquetTable); Thanks, Jesse
Parquet data stream group converter error
I am trying to read a parquet file into a datastream and then register that stream as a temporary table. This file is created by spark 2.4 in HDFS on AWS EMR. I am using flink version 1.10.0 with EMR 5.30. I am getting the following error: Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351) Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter" at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34) at org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:267) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207) at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327) Below is a snippet of code that shows how I am trying to read the parquet file: String filePath = "hdfs:///path/to/single/file.parquet"; ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(filePath), new Configuration())); MessageType schema = reader.getFooter().getFileMetaData().getSchema(); String parquetPath = "hdfs:///path/to/parquet/directory”; DataStream parquetStream = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(parquetPath), schema), parquetPath); Table parquetTable = tEnv.fromDataStream(parquetStream); tEnv.createTemporaryView("isession", parquetTable); Thanks, Jesse