Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

2019-01-22 Thread Chang Liu
Hi Stefan,

I have upgraded to 1.6.1. I saw the warnings are gone but my issue remains: the 
scenarios: Set[Scenario] cannot be passed as a method argument in order to be 
used in the map function.

But it is working if I just directly use the object variable scenarios: 
Set[Scenario] instead of passing it as a method argument.

Does it have anything to do with the class Scenario?

Many Thanks.

Best regards/祝好,

Chang Liu 刘畅


> On 22 Jan 2019, at 16:34, Chang Liu  wrote:
> 
> Hi Stefan,
> 
> Thanks. I am using 1.6.0. I will upgrade to 1.6.1 and see whether the problem 
> remains.
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 22 Jan 2019, at 16:10, Stefan Richter > > wrote:
>> 
>> Hi,
>> 
>> Which version of Flink are you using? This issue 
>> https://issues.apache.org/jira/browse/FLINK-10283 
>>  shows that a similar 
>> problem was fixed in 1.6.1 and 1.7. If you use a newer version and still 
>> encounter the problem, you can reopen the issue and describe how this is 
>> still a problem for you.
>> 
>> Best,
>> Stefan 
>> 
>>> On 22. Jan 2019, at 13:49, Chang Liu >> > wrote:
>>> 
>>> I have tried another way, it is not working as well:
>>> 
>>> def evaluationStream(
>>> indicatorsStream: DataStream[Indicators],
>>> scenarios: Set[Scenario]): DataStream[Evaluation] =
>>>   indicatorsStream.map(new IndicatorsToTxEval(scenarios))
>>> 
>>> class IndicatorsToTxEval(
>>> scenarios: Set[Scenario])
>>>   extends MapFunction[Indicators, Evaluation] {
>>> 
>>>   override def map(inds: Indicators): Evaluation =
>>> Evaluation(indicators.id , 
>>> evaluateScenarios(indicators, scenarios))
>>> }
>>> 
>>> Best regards/祝好,
>>> 
>>> Chang Liu 刘畅
>>> 
>>> 
 On 22 Jan 2019, at 13:33, Chang Liu >>> > wrote:
 
 Ok, I think I found where is the issue, but I don’t understand why.
 
 I have a method:
 
 def evaluationStream(
 indicatorsStream: DataStream[Indicators],
 scenarios: Set[Scenario]): DataStream[Evaluation] =
   indicatorsStream.map { indicators =>
 Evaluation(indicators.id , 
 evaluateScenarios(indicators, scenarios))
   }
 
 And this is how I am using it:
 
 lazy indicatorsStream: DataStream[Indicators] = ...
 
 lazy val scenarios: Set[Scenario] = loadScenarios(...)
 
 lazy val evalStream: DataStream[Evaluation] = 
 evaluationStream(indicatorsStream, scenarios).print()
 
 
 The problem is caused by the scenarios, which is passed as an argument of 
 the method evaluationStream. But is is not working.
 
 It will work if I do it in the following way:
 
 lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)
 
 def evaluationStream(indicatorsStream: DataStream[Indicators]): 
 DataStream[Evaluation] =
   indicatorsStream.map { indicators =>
 Evaluation(indicators.id , 
 evaluateScenarios(indicators, scenarios))
   }
 
 where the scenarios is not passed as a method argument but is a static 
 object variable.
 
 But this is not what I want, I would like to have a configurable scenarios 
 which I can load from config file instead of a static object variable.
 
 Any idea why this is happening? I also have other codes where I am also 
 passing arguments and use them as part of my data flow and they are just 
 working fine.
 
 Many Thanks.
 
 Best regards/祝好,
 
 Chang Liu 刘畅
 
 
> On 22 Jan 2019, at 10:47, Chang Liu  > wrote:
> 
> Dear community,
> 
> I am having a problem releasing the job.
> 
> 2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> 
> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> 
> TxSepaStream -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: 
> Print to Std. Out (2/4)] [FileCache] - improper use of releaseJob() 
> without a matching number of createTmpFiles() calls for jobId 
> 9e1557723a065925e01c7749899547fb
>  
> I searched online but only found this: 
> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles
>  
> 
> 
> However, this warnings are keeping popping up and the job cannot be 
> released so that my data flow is not working.
> 
> But if I remove my last operator, it will work just fine. But my last 
> operator is justing doing some map operation. I am wondering what could 
> be the cause of this issue?
> 
> Many Thanks :)
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅

Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

2019-01-22 Thread Chang Liu
Hi Stefan,

Thanks. I am using 1.6.0. I will upgrade to 1.6.1 and see whether the problem 
remains.

Best regards/祝好,

Chang Liu 刘畅


> On 22 Jan 2019, at 16:10, Stefan Richter  wrote:
> 
> Hi,
> 
> Which version of Flink are you using? This issue 
> https://issues.apache.org/jira/browse/FLINK-10283 
>  shows that a similar 
> problem was fixed in 1.6.1 and 1.7. If you use a newer version and still 
> encounter the problem, you can reopen the issue and describe how this is 
> still a problem for you.
> 
> Best,
> Stefan 
> 
>> On 22. Jan 2019, at 13:49, Chang Liu > > wrote:
>> 
>> I have tried another way, it is not working as well:
>> 
>> def evaluationStream(
>> indicatorsStream: DataStream[Indicators],
>> scenarios: Set[Scenario]): DataStream[Evaluation] =
>>   indicatorsStream.map(new IndicatorsToTxEval(scenarios))
>> 
>> class IndicatorsToTxEval(
>> scenarios: Set[Scenario])
>>   extends MapFunction[Indicators, Evaluation] {
>> 
>>   override def map(inds: Indicators): Evaluation =
>> Evaluation(indicators.id , 
>> evaluateScenarios(indicators, scenarios))
>> }
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
>>> On 22 Jan 2019, at 13:33, Chang Liu >> > wrote:
>>> 
>>> Ok, I think I found where is the issue, but I don’t understand why.
>>> 
>>> I have a method:
>>> 
>>> def evaluationStream(
>>> indicatorsStream: DataStream[Indicators],
>>> scenarios: Set[Scenario]): DataStream[Evaluation] =
>>>   indicatorsStream.map { indicators =>
>>> Evaluation(indicators.id , 
>>> evaluateScenarios(indicators, scenarios))
>>>   }
>>> 
>>> And this is how I am using it:
>>> 
>>> lazy indicatorsStream: DataStream[Indicators] = ...
>>> 
>>> lazy val scenarios: Set[Scenario] = loadScenarios(...)
>>> 
>>> lazy val evalStream: DataStream[Evaluation] = 
>>> evaluationStream(indicatorsStream, scenarios).print()
>>> 
>>> 
>>> The problem is caused by the scenarios, which is passed as an argument of 
>>> the method evaluationStream. But is is not working.
>>> 
>>> It will work if I do it in the following way:
>>> 
>>> lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)
>>> 
>>> def evaluationStream(indicatorsStream: DataStream[Indicators]): 
>>> DataStream[Evaluation] =
>>>   indicatorsStream.map { indicators =>
>>> Evaluation(indicators.id , 
>>> evaluateScenarios(indicators, scenarios))
>>>   }
>>> 
>>> where the scenarios is not passed as a method argument but is a static 
>>> object variable.
>>> 
>>> But this is not what I want, I would like to have a configurable scenarios 
>>> which I can load from config file instead of a static object variable.
>>> 
>>> Any idea why this is happening? I also have other codes where I am also 
>>> passing arguments and use them as part of my data flow and they are just 
>>> working fine.
>>> 
>>> Many Thanks.
>>> 
>>> Best regards/祝好,
>>> 
>>> Chang Liu 刘畅
>>> 
>>> 
 On 22 Jan 2019, at 10:47, Chang Liu >>> > wrote:
 
 Dear community,
 
 I am having a problem releasing the job.
 
 2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> 
 ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream 
 -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. 
 Out (2/4)] [FileCache] - improper use of releaseJob() without a matching 
 number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
  
 I searched online but only found this: 
 https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles
  
 
 
 However, this warnings are keeping popping up and the job cannot be 
 released so that my data flow is not working.
 
 But if I remove my last operator, it will work just fine. But my last 
 operator is justing doing some map operation. I am wondering what could be 
 the cause of this issue?
 
 Many Thanks :)
 
 Best regards/祝好,
 
 Chang Liu 刘畅
 
 
>>> 
>> 
> 



Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

2019-01-22 Thread Stefan Richter
Hi,

Which version of Flink are you using? This issue 
https://issues.apache.org/jira/browse/FLINK-10283 
 shows that a similar 
problem was fixed in 1.6.1 and 1.7. If you use a newer version and still 
encounter the problem, you can reopen the issue and describe how this is still 
a problem for you.

Best,
Stefan 

> On 22. Jan 2019, at 13:49, Chang Liu  wrote:
> 
> I have tried another way, it is not working as well:
> 
> def evaluationStream(
> indicatorsStream: DataStream[Indicators],
> scenarios: Set[Scenario]): DataStream[Evaluation] =
>   indicatorsStream.map(new IndicatorsToTxEval(scenarios))
> 
> class IndicatorsToTxEval(
> scenarios: Set[Scenario])
>   extends MapFunction[Indicators, Evaluation] {
> 
>   override def map(inds: Indicators): Evaluation =
> Evaluation(indicators.id , 
> evaluateScenarios(indicators, scenarios))
> }
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 22 Jan 2019, at 13:33, Chang Liu > > wrote:
>> 
>> Ok, I think I found where is the issue, but I don’t understand why.
>> 
>> I have a method:
>> 
>> def evaluationStream(
>> indicatorsStream: DataStream[Indicators],
>> scenarios: Set[Scenario]): DataStream[Evaluation] =
>>   indicatorsStream.map { indicators =>
>> Evaluation(indicators.id , 
>> evaluateScenarios(indicators, scenarios))
>>   }
>> 
>> And this is how I am using it:
>> 
>> lazy indicatorsStream: DataStream[Indicators] = ...
>> 
>> lazy val scenarios: Set[Scenario] = loadScenarios(...)
>> 
>> lazy val evalStream: DataStream[Evaluation] = 
>> evaluationStream(indicatorsStream, scenarios).print()
>> 
>> 
>> The problem is caused by the scenarios, which is passed as an argument of 
>> the method evaluationStream. But is is not working.
>> 
>> It will work if I do it in the following way:
>> 
>> lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)
>> 
>> def evaluationStream(indicatorsStream: DataStream[Indicators]): 
>> DataStream[Evaluation] =
>>   indicatorsStream.map { indicators =>
>> Evaluation(indicators.id , 
>> evaluateScenarios(indicators, scenarios))
>>   }
>> 
>> where the scenarios is not passed as a method argument but is a static 
>> object variable.
>> 
>> But this is not what I want, I would like to have a configurable scenarios 
>> which I can load from config file instead of a static object variable.
>> 
>> Any idea why this is happening? I also have other codes where I am also 
>> passing arguments and use them as part of my data flow and they are just 
>> working fine.
>> 
>> Many Thanks.
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
>>> On 22 Jan 2019, at 10:47, Chang Liu >> > wrote:
>>> 
>>> Dear community,
>>> 
>>> I am having a problem releasing the job.
>>> 
>>> 2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> 
>>> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream 
>>> -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. 
>>> Out (2/4)] [FileCache] - improper use of releaseJob() without a matching 
>>> number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
>>>  
>>> I searched online but only found this: 
>>> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles
>>>  
>>> 
>>> 
>>> However, this warnings are keeping popping up and the job cannot be 
>>> released so that my data flow is not working.
>>> 
>>> But if I remove my last operator, it will work just fine. But my last 
>>> operator is justing doing some map operation. I am wondering what could be 
>>> the cause of this issue?
>>> 
>>> Many Thanks :)
>>> 
>>> Best regards/祝好,
>>> 
>>> Chang Liu 刘畅
>>> 
>>> 
>> 
> 



Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

2019-01-22 Thread Chang Liu
I have tried another way, it is not working as well:

def evaluationStream(
indicatorsStream: DataStream[Indicators],
scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map(new IndicatorsToTxEval(scenarios))

class IndicatorsToTxEval(
scenarios: Set[Scenario])
  extends MapFunction[Indicators, Evaluation] {

  override def map(inds: Indicators): Evaluation =
Evaluation(indicators.id , 
evaluateScenarios(indicators, scenarios))
}

Best regards/祝好,

Chang Liu 刘畅


> On 22 Jan 2019, at 13:33, Chang Liu  wrote:
> 
> Ok, I think I found where is the issue, but I don’t understand why.
> 
> I have a method:
> 
> def evaluationStream(
> indicatorsStream: DataStream[Indicators],
> scenarios: Set[Scenario]): DataStream[Evaluation] =
>   indicatorsStream.map { indicators =>
> Evaluation(indicators.id , 
> evaluateScenarios(indicators, scenarios))
>   }
> 
> And this is how I am using it:
> 
> lazy indicatorsStream: DataStream[Indicators] = ...
> 
> lazy val scenarios: Set[Scenario] = loadScenarios(...)
> 
> lazy val evalStream: DataStream[Evaluation] = 
> evaluationStream(indicatorsStream, scenarios).print()
> 
> 
> The problem is caused by the scenarios, which is passed as an argument of the 
> method evaluationStream. But is is not working.
> 
> It will work if I do it in the following way:
> 
> lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)
> 
> def evaluationStream(indicatorsStream: DataStream[Indicators]): 
> DataStream[Evaluation] =
>   indicatorsStream.map { indicators =>
> Evaluation(indicators.id , 
> evaluateScenarios(indicators, scenarios))
>   }
> 
> where the scenarios is not passed as a method argument but is a static object 
> variable.
> 
> But this is not what I want, I would like to have a configurable scenarios 
> which I can load from config file instead of a static object variable.
> 
> Any idea why this is happening? I also have other codes where I am also 
> passing arguments and use them as part of my data flow and they are just 
> working fine.
> 
> Many Thanks.
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 22 Jan 2019, at 10:47, Chang Liu > > wrote:
>> 
>> Dear community,
>> 
>> I am having a problem releasing the job.
>> 
>> 2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> 
>> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream 
>> -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. 
>> Out (2/4)] [FileCache] - improper use of releaseJob() without a matching 
>> number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
>>  
>> I searched online but only found this: 
>> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles
>>  
>> 
>> 
>> However, this warnings are keeping popping up and the job cannot be released 
>> so that my data flow is not working.
>> 
>> But if I remove my last operator, it will work just fine. But my last 
>> operator is justing doing some map operation. I am wondering what could be 
>> the cause of this issue?
>> 
>> Many Thanks :)
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
> 



Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

2019-01-22 Thread Chang Liu
Ok, I think I found where is the issue, but I don’t understand why.

I have a method:

def evaluationStream(
indicatorsStream: DataStream[Indicators],
scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
Evaluation(indicators.id , 
evaluateScenarios(indicators, scenarios))
  }

And this is how I am using it:

lazy indicatorsStream: DataStream[Indicators] = ...

lazy val scenarios: Set[Scenario] = loadScenarios(...)

lazy val evalStream: DataStream[Evaluation] = 
evaluationStream(indicatorsStream, scenarios).print()


The problem is caused by the scenarios, which is passed as an argument of the 
method evaluationStream. But is is not working.

It will work if I do it in the following way:

lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)

def evaluationStream(indicatorsStream: DataStream[Indicators]): 
DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
Evaluation(indicators.id , 
evaluateScenarios(indicators, scenarios))
  }

where the scenarios is not passed as a method argument but is a static object 
variable.

But this is not what I want, I would like to have a configurable scenarios 
which I can load from config file instead of a static object variable.

Any idea why this is happening? I also have other codes where I am also passing 
arguments and use them as part of my data flow and they are just working fine.

Many Thanks.

Best regards/祝好,

Chang Liu 刘畅


> On 22 Jan 2019, at 10:47, Chang Liu  wrote:
> 
> Dear community,
> 
> I am having a problem releasing the job.
> 
> 2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> 
> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream -> 
> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. Out 
> (2/4)] [FileCache] - improper use of releaseJob() without a matching number 
> of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
>  
> I searched online but only found this: 
> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles
>  
> 
> 
> However, this warnings are keeping popping up and the job cannot be released 
> so that my data flow is not working.
> 
> But if I remove my last operator, it will work just fine. But my last 
> operator is justing doing some map operation. I am wondering what could be 
> the cause of this issue?
> 
> Many Thanks :)
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 



improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

2019-01-22 Thread Chang Liu
Dear community,

I am having a problem releasing the job.

2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> 
ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream -> 
TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. Out 
(2/4)] [FileCache] - improper use of releaseJob() without a matching number of 
createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
 
I searched online but only found this: 
https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles
 


However, this warnings are keeping popping up and the job cannot be released so 
that my data flow is not working.

But if I remove my last operator, it will work just fine. But my last operator 
is justing doing some map operation. I am wondering what could be the cause of 
this issue?

Many Thanks :)

Best regards/祝好,

Chang Liu 刘畅