Re: TypeInformation problem

2019-12-15 Thread vino yang
Hi Nick,

>From StackOverflow, I see a similar issue which answered by @Till Rohrmann
 . [1]
FYI.

Best,
Vino

[1]:
https://stackoverflow.com/questions/38214958/flink-error-specifying-keys-via-field-positions-is-only-valid-for-tuple-data-ty

Nicholas Walton  于2019年12月14日周六 上午12:01写道:

> I was refactoring some Flink code to use IndexedSeq rather than Array. When I 
> compiled the code I had failures that required according to the URL below the 
> following to be inserted
>
> /*
>  * Type information (see 
> https://stackoverflow.com/questions/37920023/could-not-find-implicit-value-for-evidence-parameter-of-type-org-apache-flink-ap)
>  *
>  * Code when ported to use IndexedSeq rather than Array
>  * and similar refuses to build without this information
>  */
> implicit val typeInfo1 = TypeInformation.of(classOf[(Int, Long, Double, Int)])
> implicit val typeInfo2 = TypeInformation.of(classOf[(Int, Long, Double, 
> Double)])
> implicit val typeInfo3 = TypeInformation.of(classOf[(Int, Long, Double, 
> IndexedSeq[Long])])
> implicit val typeInfo4 = TypeInformation.of(classOf[(Int, Long, Double, 
> IndexedSeq[BigInt])])
> implicit val typeInfo5 = TypeInformation.of(classOf[(Int, Long, Double, 
> IndexedSeq[String])])
> implicit val typeInfo6 = TypeInformation.of(classOf[(String, Int, Long, 
> Double)])
> implicit val typeInfo7 = TypeInformation.of(classOf[(Int, Long, Double, 
> IndexedSeq[String], Int)])
> implicit val typeInfo8 = TypeInformation.of(classOf[(Int, Long, Double, 
> String, Int)])
>
>
> The code now compiles fine, but I now have a problem with the code below,
> which was working perfectly fine before I added the above and made the
> IndexedSeq refactor
>
> val readings: DataStream[(Int, Long, Double, Int)] = stream
>   .flatMap(new splitReadings())
>   .setParallelism(1)
>   .assignTimestampsAndWatermarks(new readingTimstamps)
>   .setParallelism(1)
>
>
> val maxChannelScaled: DataStream[(Int, Long, Double, Double)] = readings
>   .keyBy(0)
>   .countWindow(runmaxWinLen, 1)
>   .process(new runningMax())
>   .setParallelism(2 * env.getParallelism)
>
>
> When I submit the job I find the following in the log
>
> 2019-12-13 15:37:35,600 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> scala.Tuple4 does not contain a setter for field _1
> 2019-12-13 15:37:35,601 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> scala.Tuple4 cannot be used as a POJO type because not all fields are valid
> POJO fields, and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> 2019-12-13 15:37:35,602 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler   - Unhandled
> exception.
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Specifying keys via field positions is only valid
> for tuple data types. Type: GenericType
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:100)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Specifying
> keys via field positions is only valid for tuple data types. Type:
> GenericType
> at
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:232)
> at
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223)
> at
> org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:321)
> at
> org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:392)
> at org.example.Job$.main(Job.scala:99)
> at org.example.Job.main(Job.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> ... 9 more
>
> What is happening, and more importantly how can I fix the 

TypeInformation problem

2019-12-13 Thread Nicholas Walton
I was refactoring some Flink code to use IndexedSeq rather than Array. When I 
compiled the code I had failures that required according to the URL below the 
following to be inserted
/*
 * Type information (see 
https://stackoverflow.com/questions/37920023/could-not-find-implicit-value-for-evidence-parameter-of-type-org-apache-flink-ap)
 *
 * Code when ported to use IndexedSeq rather than Array
 * and similar refuses to build without this information
 */
implicit val typeInfo1 = TypeInformation.of(classOf[(Int, Long, Double, Int)])
implicit val typeInfo2 = TypeInformation.of(classOf[(Int, Long, Double, 
Double)])
implicit val typeInfo3 = TypeInformation.of(classOf[(Int, Long, Double, 
IndexedSeq[Long])])
implicit val typeInfo4 = TypeInformation.of(classOf[(Int, Long, Double, 
IndexedSeq[BigInt])])
implicit val typeInfo5 = TypeInformation.of(classOf[(Int, Long, Double, 
IndexedSeq[String])])
implicit val typeInfo6 = TypeInformation.of(classOf[(String, Int, Long, 
Double)])
implicit val typeInfo7 = TypeInformation.of(classOf[(Int, Long, Double, 
IndexedSeq[String], Int)])
implicit val typeInfo8 = TypeInformation.of(classOf[(Int, Long, Double, String, 
Int)])

The code now compiles fine, but I now have a problem with the code below, which 
was working perfectly fine before I added the above and made the IndexedSeq 
refactor

val readings: DataStream[(Int, Long, Double, Int)] = stream
  .flatMap(new splitReadings())
  .setParallelism(1)
  .assignTimestampsAndWatermarks(new readingTimstamps)
  .setParallelism(1)


val maxChannelScaled: DataStream[(Int, Long, Double, Double)] = readings
  .keyBy(0)
  .countWindow(runmaxWinLen, 1)
  .process(new runningMax())
  .setParallelism(2 * env.getParallelism)

When I submit the job I find the following in the log

2019-12-13 15:37:35,600 INFO  org.apache.flink.api.java.typeutils.TypeExtractor 
- class scala.Tuple4 does not contain a setter for field _1
2019-12-13 15:37:35,601 INFO  org.apache.flink.api.java.typeutils.TypeExtractor 
- Class class scala.Tuple4 cannot be used as a POJO type because 
not all fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" for details 
of the effect on performance.
2019-12-13 15:37:35,602 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler   - Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Specifying keys via field positions is only valid for tuple 
data types. Type: GenericType
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:100)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.InvalidProgramException: Specifying keys 
via field positions is only valid for tuple data types. Type: 
GenericType
at 
org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:232)
at 
org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223)
at 
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:321)
at 
org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:392)
at org.example.Job$.main(Job.scala:99)
at org.example.Job.main(Job.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more

What is happening, and more importantly how can I fix the problem?

TIA

Nick