Thanks, Fabian. Problem solved after implementing the Serializable interface from all the services of the stack or making transient the ones not needed.
Best, Konstantinos From: Fabian Hueske <fhue...@gmail.com> Sent: Δευτέρα, 8 Απριλίου 2019 11:37 πμ To: Papadopoulos, Konstantinos <konstantinos.papadopou...@iriworldwide.com> Cc: Chesnay Schepler <ches...@apache.org>; user <user@flink.apache.org> Subject: Re: InvalidProgramException when trying to sort a group within a dataset Hi, If you have a closer look at the excecption, you'll see that the issue is cause by com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl not being serializable. It seems that you have a reference to this class somewhere. Flink requires that all function classes (like KeySelector) are serializable. Best, Fabian Am Mo., 8. Apr. 2019 um 09:13 Uhr schrieb Papadopoulos, Konstantinos <konstantinos.papadopou...@iriworldwide.com<mailto:konstantinos.papadopou...@iriworldwide.com>>: Hi Fabian, Thanks for your support. I updated my POJO to implement the Serializable interface with no success. I got the same NotSerializableException. Best, Konstantinos From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> Sent: Σάββατο, 6 Απριλίου 2019 2:26 πμ To: Papadopoulos, Konstantinos <konstantinos.papadopou...@iriworldwide.com<mailto:konstantinos.papadopou...@iriworldwide.com>> Cc: Chesnay Schepler <ches...@apache.org<mailto:ches...@apache.org>>; user <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: InvalidProgramException when trying to sort a group within a dataset Hi, You POJO should implement the Serializable interface. Otherwise it's not considered to be serializable. Best, Fabian Papadopoulos, Konstantinos <konstantinos.papadopou...@iriworldwide.com<mailto:konstantinos.papadopou...@iriworldwide.com>> schrieb am Mi., 3. Apr. 2019, 07:22: Hi Chesnay, Thanks for your support. ThresholdAcvFact class is a simple POJO with the following definition: public class ThresholdAcvFact { private Long timePeriodId; private Long geographyId; private Long productId; private Long customerId; private Double basePrice; private Double promoPrice; private Double basePriceAcv; private Double promoPriceAcv; private Long count; public Long getTimePeriodId() { return timePeriodId; } public void setTimePeriodId(Long timePeriodId) { this.timePeriodId = timePeriodId; } public Long getGeographyId() { return geographyId; } public void setGeographyId(Long geographyId) { this.geographyId = geographyId; } public Long getProductId() { return productId; } public void setProductId(Long productId) { this.productId = productId; } public Long getCustomerId() { return customerId; } public void setCustomerId(Long customerId) { this.customerId = customerId; } public Double getBasePrice() { return basePrice; } public void setBasePrice(Double basePrice) { this.basePrice = basePrice; } public Double getPromoPrice() { return promoPrice; } public void setPromoPrice(Double promoPrice) { this.promoPrice = promoPrice; } public Double getBasePriceAcv() { return basePriceAcv; } public void setBasePriceAcv(Double basePriceAcv) { this.basePriceAcv = basePriceAcv; } public Double getPromoPriceAcv() { return promoPriceAcv; } public void setPromoPriceAcv(Double promoPriceAcv) { this.promoPriceAcv = promoPriceAcv; } public Long getCount() { return count; } public void setCount(Long count) { this.count = count; } @Override public String toString() { return "ThresholdAcvFact{" + "timePeriodId=" + timePeriodId + ", geographyId=" + geographyId + ", productId=" + productId + ", customerId=" + customerId + ", basePrice=" + basePrice + ", promoPrice=" + promoPrice + ", basePriceAcv=" + basePriceAcv + ", promoPriceAcv=" + promoPriceAcv + ", count=" + count + '}'; } } While the implementation of the function we faced the issue reported is the following: public DataSet<ThresholdAcvFact> transform(ThresholdAcvCalcSources thresholdAcvCalcSources, Long customerId) { final DataSet<ThresholdAcvFact> basePriceFacts = getPriceFacts( thresholdAcvCalcSources.getBasePriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(), new ThresholdAcvBasePriceFactMapper(customerId)); final DataSet<ThresholdAcvFact> promoPriceFacts = getPriceFacts( thresholdAcvCalcSources.getPromoPriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(), new ThresholdAcvPromoPriceFactMapper(customerId)); return basePriceFacts .fullOuterJoin(promoPriceFacts) .where(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "basePrice") .equalTo(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "promoPrice") .with(new ThresholdAcvFactBasePromoPriceJoiner()) .groupBy(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID) .sortGroup(new KeySelector<ThresholdAcvFact, Double>() { @Override public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception { return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice()); } }, Order.ASCENDING) .reduceGroup(new ThresholdAcvFactCountGroupReducer()); } Regards, Konstantinos From: Chesnay Schepler <ches...@apache.org<mailto:ches...@apache.org>> Sent: Τετάρτη, 3 Απριλίου 2019 12:59 μμ To: Papadopoulos, Konstantinos <konstantinos.papadopou...@iriworldwide.com<mailto:konstantinos.papadopou...@iriworldwide.com>>; user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: InvalidProgramException when trying to sort a group within a dataset Your user-defined functions are referencing the class "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't serializable. My guess is that "ThresholdAcvFact" is a non-static inner class, however I would need to see the entire class to give an accurate analysis. On 02/04/2019 12:31, Papadopoulos, Konstantinos wrote: Hi all, I am trying to sort a group within a dataset using KeySelector as follows: in .groupBy(“productId”, “timePeriodId”, “geographyId”) .sortGroup(new KeySelector<ThresholdAcvFact, Double>() { @Override public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception { return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice()); } }, Order.ASCENDING) .reduceGroup(/* do something */) And I am getting the following exception: org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting keys can only be used with KeySelector grouping keys. at org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318) at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91) at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220) at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188) at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’ transformations as follows: in .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() { @Override public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact thresholdAcvFact) throws Exception { return new Tuple3<>(thresholdAcvFact.getProductId(), thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId()); } }) .sortGroup(new KeySelector<ThresholdAcvFact, Double>() { @Override public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception { return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice()); } }, Order.ASCENDING) .reduceGroup(/* do something */) The job execution still failed with the following exception: org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220) at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188) at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374) ... 50 more Caused by: java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534) at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463) at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279) ... 52 more Does anyone have any idea how I can surpass such issues? Thanks in advance