Hi Stefan,

Thanks. I am using 1.6.0. I will upgrade to 1.6.1 and see whether the problem 
remains.

Best regards/祝好,

Chang Liu 刘畅


> On 22 Jan 2019, at 16:10, Stefan Richter <s.rich...@da-platform.com> wrote:
> 
> Hi,
> 
> Which version of Flink are you using? This issue 
> https://issues.apache.org/jira/browse/FLINK-10283 
> <https://issues.apache.org/jira/browse/FLINK-10283> shows that a similar 
> problem was fixed in 1.6.1 and 1.7. If you use a newer version and still 
> encounter the problem, you can reopen the issue and describe how this is 
> still a problem for you.
> 
> Best,
> Stefan 
> 
>> On 22. Jan 2019, at 13:49, Chang Liu <fluency...@gmail.com 
>> <mailto:fluency...@gmail.com>> wrote:
>> 
>> I have tried another way, it is not working as well:
>> 
>> def evaluationStream(
>>     indicatorsStream: DataStream[Indicators],
>>     scenarios: Set[Scenario]): DataStream[Evaluation] =
>>   indicatorsStream.map(new IndicatorsToTxEval(scenarios))
>> 
>> class IndicatorsToTxEval(
>>     scenarios: Set[Scenario])
>>   extends MapFunction[Indicators, Evaluation] {
>> 
>>   override def map(inds: Indicators): Evaluation =
>>     Evaluation(indicators.id <http://indicator.id/>, 
>> evaluateScenarios(indicators, scenarios))
>> }
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
>>> On 22 Jan 2019, at 13:33, Chang Liu <fluency...@gmail.com 
>>> <mailto:fluency...@gmail.com>> wrote:
>>> 
>>> Ok, I think I found where is the issue, but I don’t understand why.
>>> 
>>> I have a method:
>>> 
>>> def evaluationStream(
>>>     indicatorsStream: DataStream[Indicators],
>>>     scenarios: Set[Scenario]): DataStream[Evaluation] =
>>>   indicatorsStream.map { indicators =>
>>>     Evaluation(indicators.id <http://indicator.id/>, 
>>> evaluateScenarios(indicators, scenarios))
>>>   }
>>> 
>>> And this is how I am using it:
>>> 
>>> lazy indicatorsStream: DataStream[Indicators] = ...
>>> 
>>> lazy val scenarios: Set[Scenario] = loadScenarios(...)
>>> 
>>> lazy val evalStream: DataStream[Evaluation] = 
>>> evaluationStream(indicatorsStream, scenarios).print()
>>> 
>>> 
>>> The problem is caused by the scenarios, which is passed as an argument of 
>>> the method evaluationStream. But is is not working.
>>> 
>>> It will work if I do it in the following way:
>>> 
>>> lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)
>>> 
>>> def evaluationStream(indicatorsStream: DataStream[Indicators]): 
>>> DataStream[Evaluation] =
>>>   indicatorsStream.map { indicators =>
>>>     Evaluation(indicators.id <http://indicator.id/>, 
>>> evaluateScenarios(indicators, scenarios))
>>>   }
>>> 
>>> where the scenarios is not passed as a method argument but is a static 
>>> object variable.
>>> 
>>> But this is not what I want, I would like to have a configurable scenarios 
>>> which I can load from config file instead of a static object variable.
>>> 
>>> Any idea why this is happening? I also have other codes where I am also 
>>> passing arguments and use them as part of my data flow and they are just 
>>> working fine.
>>> 
>>> Many Thanks.
>>> 
>>> Best regards/祝好,
>>> 
>>> Chang Liu 刘畅
>>> 
>>> 
>>>> On 22 Jan 2019, at 10:47, Chang Liu <fluency...@gmail.com 
>>>> <mailto:fluency...@gmail.com>> wrote:
>>>> 
>>>> Dear community,
>>>> 
>>>> I am having a problem releasing the job.
>>>> 
>>>> 2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> 
>>>> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream 
>>>> -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. 
>>>> Out (2/4)] [FileCache] - improper use of releaseJob() without a matching 
>>>> number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
>>>>  
>>>> I searched online but only found this: 
>>>> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles
>>>>  
>>>> <https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles>
>>>> 
>>>> However, this warnings are keeping popping up and the job cannot be 
>>>> released so that my data flow is not working.
>>>> 
>>>> But if I remove my last operator, it will work just fine. But my last 
>>>> operator is justing doing some map operation. I am wondering what could be 
>>>> the cause of this issue?
>>>> 
>>>> Many Thanks :)
>>>> 
>>>> Best regards/祝好,
>>>> 
>>>> Chang Liu 刘畅
>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to