Re: custom writer fail to recover
Hi Das, Have you got your .pending issue resolved? I am running into the same issue where the parquet files are all in pending status. Please help to share your solutions. Thanks, Tao -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Pending parquet file with Bucking Sink
Hi Vipul, Thanks for the information. Yes, I do have checkpointing enabled with 10 millisecs. I think the issue here is that the stream ended before the checkpoint reached. This is a testing code that the DataStream only have 5 events then it ended. Once the stream ended, the checkpoint is not triggered, then the file remains in "pending" state. Anyway we can force a checkpoint trigger? or let the sink know the stream ended? Thanks, Tao -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Anyone got Flink working in EMR with KinesisConnector
got the issue fixed after applying patch from https://github.com/apache/flink/pull/4150 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
NoClassDefFoundError of a Avro class after cancel then resubmit the same job
Not sure why, when I submit the job at the first time after a cluster launch, it is working fine. After I cancelled the first job, then resubmit the same job again, it will hit the NoClassDefFoundError. Very weird, feels like some clean up of a cancelled job messed up future job of the same classes. Anyone got the same issue? java.lang.NoClassDefFoundError: com.xxx.yyy.zzz$Builder at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.getDeclaredMethods(Class.java:1975) at org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods(TypeExtractionUtils.java:243) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1949) at org.apache.flink.api.java.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:55) at org.apache.flink.api.java.typeutils.AvroTypeInfo.(AvroTypeInfo.java:48) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1810) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1716) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:953) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:814) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:768) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:764) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createSubclassSerializer(PojoSerializer.java:1129) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.getSubclassSerializer(PojoSerializer.java:1122) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:253) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:309) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:408) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:486) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:263) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:209) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.xxx.yyy.zzz$Builder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 31 more -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
How to access JobManager and TaskManager
In the web UI, I can see these information under JobManager. How can I access variables job_env in main method? Job Manager Configuration env.hadoop.conf.dir /etc/hadoop/conf env.yarn.conf.dir /etc/hadoop/conf high-availability.cluster-idapplication_1517362137681_0001 job_env stage jobmanager.rpc.address ip-172-32-37-243.us-west-2.compute.internal jobmanager.rpc.port 46253 jobmanager.web.port 0 taskmanager.numberOfTaskSlots 4 When Task Manager starts, I also noticed the same setting "job_env" is loaded in GlobalConfiguration. 2018-01-31 01:34:54,970 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: env.yarn.conf.dir, /etc/hadoop/conf 2018-01-31 01:34:54,976 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.maxRegistrationDuration, 5 minutes 2018-01-31 01:34:54,979 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: high-availability.cluster-id, application_1517362137681_0001 2018-01-31 01:34:54,979 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: env.hadoop.conf.dir, /etc/hadoop/conf 2018-01-31 01:34:54,979 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.numberOfTaskSlots, 4 2018-01-31 01:34:54,982 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.address, ip-172-32-37-243.us-west-2.compute.internal 2018-01-31 01:34:54,982 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: job_env, stage 2018-01-31 01:34:54,982 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.web.port, 0 2018-01-31 01:34:54,983 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.port, 46253 BUT, when I try to access or print out all the variables in my main method val configs = GlobalConfiguration.loadConfiguration().toMap for ((k,v) <- configs) println(s"Configs key: $k, value: $v") I only got these 3: Configs key: env.hadoop.conf.dir, value: /etc/hadoop/conf Configs key: taskmanager.numberOfTaskSlots, value: 4 Configs key: env.yarn.conf.dir, value: /etc/hadoop/conf anyone can help? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: How to access JobManager and TaskManager
Hi Tim, "job_env" is a variable I passed to launch YARN application. I just want to access it in my flink application main method. There is is no documentation on how to access customized job environment variables or settings. Thanks, Tao -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
how to match external checkpoints with jobs during recovery
The external checkpoints are in the format of checkpoint_metadata-0057 which I have no idea which job this checkpoint metadata belongs to if I have multiple jobs running at the same time. If a job failed unexpected, I need to know which checkpoints belongs to the failed job. Is there API or someway to show the checkpoints folder for a certain job? Thanks for the help Tao -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Support COUNT(DISTINCT 'field') Query Yet?
SELECT TUMBLE_START(event_timestamp, INTERVAL '1' HOUR), COUNT(DISTINCT session), COUNT(DISTINCT user_id), SUM(duration), SUM(num_interactions) FROM unified_events GROUP BY TUMBLE(event_timestamp, INTERVAL '1' HOUR) I have the above statement my flink query running on Flink 1.3.2, but got the error message Caused by: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query Is the feature supported yet? if Yes, in which version of flink? If no, any timeline to support it? Thanks, Tao -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Checkpoint is not triggering as per configuration
I ran into a similar issue. Since it is a "Custom File Source", the first source just listing folder/file path for all existing files. Next operator "Split Reader" will read the content of the file. "Custom File Source" went to "finished" state after first couple secs. That's way we got this error message "Custom File Source (1/1) is not being executed at the moment. Aborting checkpoint". Because the "Custom File Source" finished already. Is this by design? Although the "Custom File Source" finished in secs, the rest of the pipeline can running for hours or days. Whenever anything went wrong, the pipeline will restart and start to reading from the beginning again, since there is not any checkpoint. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/