Hi Stefan, Thanks. I am using 1.6.0. I will upgrade to 1.6.1 and see whether the problem remains.
Best regards/祝好, Chang Liu 刘畅 > On 22 Jan 2019, at 16:10, Stefan Richter <s.rich...@da-platform.com> wrote: > > Hi, > > Which version of Flink are you using? This issue > https://issues.apache.org/jira/browse/FLINK-10283 > <https://issues.apache.org/jira/browse/FLINK-10283> shows that a similar > problem was fixed in 1.6.1 and 1.7. If you use a newer version and still > encounter the problem, you can reopen the issue and describe how this is > still a problem for you. > > Best, > Stefan > >> On 22. Jan 2019, at 13:49, Chang Liu <fluency...@gmail.com >> <mailto:fluency...@gmail.com>> wrote: >> >> I have tried another way, it is not working as well: >> >> def evaluationStream( >> indicatorsStream: DataStream[Indicators], >> scenarios: Set[Scenario]): DataStream[Evaluation] = >> indicatorsStream.map(new IndicatorsToTxEval(scenarios)) >> >> class IndicatorsToTxEval( >> scenarios: Set[Scenario]) >> extends MapFunction[Indicators, Evaluation] { >> >> override def map(inds: Indicators): Evaluation = >> Evaluation(indicators.id <http://indicator.id/>, >> evaluateScenarios(indicators, scenarios)) >> } >> >> Best regards/祝好, >> >> Chang Liu 刘畅 >> >> >>> On 22 Jan 2019, at 13:33, Chang Liu <fluency...@gmail.com >>> <mailto:fluency...@gmail.com>> wrote: >>> >>> Ok, I think I found where is the issue, but I don’t understand why. >>> >>> I have a method: >>> >>> def evaluationStream( >>> indicatorsStream: DataStream[Indicators], >>> scenarios: Set[Scenario]): DataStream[Evaluation] = >>> indicatorsStream.map { indicators => >>> Evaluation(indicators.id <http://indicator.id/>, >>> evaluateScenarios(indicators, scenarios)) >>> } >>> >>> And this is how I am using it: >>> >>> lazy indicatorsStream: DataStream[Indicators] = ... >>> >>> lazy val scenarios: Set[Scenario] = loadScenarios(...) >>> >>> lazy val evalStream: DataStream[Evaluation] = >>> evaluationStream(indicatorsStream, scenarios).print() >>> >>> >>> The problem is caused by the scenarios, which is passed as an argument of >>> the method evaluationStream. But is is not working. >>> >>> It will work if I do it in the following way: >>> >>> lazy val scenarios: Set[Scenario] = Set(S1, S2, ...) >>> >>> def evaluationStream(indicatorsStream: DataStream[Indicators]): >>> DataStream[Evaluation] = >>> indicatorsStream.map { indicators => >>> Evaluation(indicators.id <http://indicator.id/>, >>> evaluateScenarios(indicators, scenarios)) >>> } >>> >>> where the scenarios is not passed as a method argument but is a static >>> object variable. >>> >>> But this is not what I want, I would like to have a configurable scenarios >>> which I can load from config file instead of a static object variable. >>> >>> Any idea why this is happening? I also have other codes where I am also >>> passing arguments and use them as part of my data flow and they are just >>> working fine. >>> >>> Many Thanks. >>> >>> Best regards/祝好, >>> >>> Chang Liu 刘畅 >>> >>> >>>> On 22 Jan 2019, at 10:47, Chang Liu <fluency...@gmail.com >>>> <mailto:fluency...@gmail.com>> wrote: >>>> >>>> Dear community, >>>> >>>> I am having a problem releasing the job. >>>> >>>> 2019-01-22 10:42:50.098 WARN [Source: Custom Source -> Kafka -> >>>> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream >>>> -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. >>>> Out (2/4)] [FileCache] - improper use of releaseJob() without a matching >>>> number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb >>>> >>>> I searched online but only found this: >>>> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles >>>> >>>> <https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles> >>>> >>>> However, this warnings are keeping popping up and the job cannot be >>>> released so that my data flow is not working. >>>> >>>> But if I remove my last operator, it will work just fine. But my last >>>> operator is justing doing some map operation. I am wondering what could be >>>> the cause of this issue? >>>> >>>> Many Thanks :) >>>> >>>> Best regards/祝好, >>>> >>>> Chang Liu 刘畅 >>>> >>>> >>> >> >