Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

2019-05-05 Thread Le-Van Huyen
Thank you Yun.

I haven't tried to follow your guide to check (would take some time for me
to follow on how to do).
However, I could now confirm that the "*union"* is the culprit. In my Flink
Console GUI, I can see that the link from StreamC to CEP via "union" is a
FORWARD link, not a HASH one, which means that having "keyBy" right before
the "union" has no effect at all. If I put a placebo "map" between "keyBy"
on streamC and "union" then the problem is solved (*.union(streamC.keyBy(r
=> (r.id1, r.id2)).map(r => r))*)

I don't know why "union" is behaving like that though. Could not find that
mentioned in any document.

Thanks a lot for your help.

Regards,
Averell


On Sun, May 5, 2019 at 11:22 PM Yun Tang  wrote:

> Hi Averell
>
> I think this is because after 'union', the input stream actually did not
> follow the rule that key must be pre-partitioned in *EXACTLY* the same
> way Flinkā€™s keyBy would partition the data [1]. An easy way to verify this
> is refer to [2] to filter whether different sub-task of union stream
> contains exactly what down stream task conatains.
>
> Best
> Yun Tang
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> [2]
> https://github.com/apache/flink/blob/4e505c67542a45c82c763c12099bdfc621c9e476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java#L223
>
> --
> *From:* Averell 
> *Sent:* Sunday, May 5, 2019 16:43
> *To:* user@flink.apache.org
> *Subject:* IllegalArgumentException with CEP & reinterpretAsKeyedStream
>
> Hi everyone,
>
> I have a big stream A, filtered by flags from a small stream B, then
> unioned
> with another stream C to become the input for my CEP.
> As the three streams A, B, C are all keyed, I expected that the output
> stream resulting from connecting/unioning them would also be keyed, thus I
> used /reinterpretAsKeyedStream/ before putting it into CEP. And with this,
> I
> got the error /IllegalArgumentException/ (full stack-trace below).
> If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
> use /keyBy/ manually), then there's no such exception.
>
> I don't know how to debug this error, and not sure whether I should use
> keyed streams with CEP?
>
> Thanks and best regards,
> Averell
>
>
> My code:
> /   val cepInput = streamA.keyBy(r => (r.id1, r.id2))
> .connect(streamB.keyBy(r => (r.id1, r.id2)))
> .flatMap(new MyCandidateFilterFunction())
> .union(streamC.keyBy(r => (r.id1, r.id2)))
>
> val cepOutput =
> MyCEP(new
> DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
> r.id2)),
> counter1, counter2,
> threshold1, threshold2)
>
> object MyCEP {
> def apply(input: KeyedStream[Event, _],
>   longPeriod: Int,
>   threshold: Int,
>   shortPeriod: Int): DataStream[Event] = {
>
> val patternLineIsUp =
> Pattern.begin[Event]("period1")
> .where((value: event, ctx:
> CepContext[Event]) => accSum(_.counter,
> Seq("period1"), value, ctx) < threshold)
> .times(longPeriod -
> shortPeriod).consecutive()
>   .next("period2")
> .where((value: Event, ctx:
> CepContext[Event]) =>
> accSum(_.counter,
> Seq("period1", "period2"), value, ctx) < threshold
> && value.status == "up")
> .times(shortPeriod).consecutive()
>
> collectPattern(input, patternLineIsUp)
> }
>
> private def accSum(f: Event => Long, keys: Seq[String],
> currentEvent:
> Event, ctx: CepContext[Event]): Long = {
> keys.map(key =>
> ctx.getEventsForPattern(key).map(f).sum).sum +
> f(currentEvent)
> }
>
> private def collectPattern(inputStream: KeyedStream[Event,
> _], pattern:
> Pattern[Event, Event]): DataStream[Event] =
> CEP.pattern(inputStream, pattern)
> .process((map: util.Map[String,
> util.List[Event]], ctx:
> PatternProcessFunction.Context, collector: Collector[Event]) => {
> val records =
> map.get("period2")
>
> collector.collect(records.get(records.size() - 1))
> })
> }/
>
> The exception:
> /Exception in thread "main" 12:43:13,103 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopped
> Akka
> RPC service.
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> 

Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Le-Van Huyen
Hi Stefan, Dawid,

I hadn't changed anything in the configuration. Env's parallelism stayed at
64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink
1.7-SNAPSHOT, with the code pulled from master about 5 days back. Savepoint
was saved to either S3 or HDFS (I tried multiple times), and had not been
moved.

Is there any kind of improper user code can cause such error?

Thanks and best regards,
Averell

On Wed, Oct 10, 2018 at 7:02 PM Stefan Richter 
wrote:

> Hi,
>
> adding to Dawids questions, it would also be very helpful to know which
> Flink version was used to create the savepoint, which Flink version was
> used in the restore attempt, if the savepoint was moved or modified.
> Outside of potential conflicts with those things, I would not expect
> anything like this.
>
> Best,
> Stefan
>
> > On 10. Oct 2018, at 09:51, Dawid Wysakowicz 
> wrote:
> >
> > Hi Averell,
> >
> > Do you try to scale the job up, meaning do you increase the job
> > parallelism? Have you increased the job max parallelism by chance? If so
> > this is not supported. The max parallelism parameter is used to create
> > key groups that can be further assigned to parallel operators. This
> > parameter cannot be changed for a job that shall be restored.
> >
> > If this is not the case, maybe Stefan(cc) have some ideas, what can go
> > wrong.
> >
> > Best,
> >
> > Dawid
> >
> >
> > On 10/10/18 09:23, Averell wrote:
> >> Hi everyone,
> >>
> >> I'm getting the following error when trying to restore from a savepoint.
> >> Here below is the output from flink bin, and in the attachment is a TM
> log.
> >> I didn't have any change in the app before and after savepoint. All
> Window
> >> operators have been assigned unique ID string.
> >>
> >> Could you please help give a look?
> >>
> >> Thanks and best regards,
> >> Averell
> >>
> >> taskmanager.gz
> >> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/taskmanager.gz>
>
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> >> (JobID: 606ad5239f5e23cedb85d3e75bf76463)
> >>  at
> >>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> >>  at
> >>
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
> >>  at
> >>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> >>  at
> >>
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
> >>  at
> >>
> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
> >>  at
> >>
> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
> >>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>  at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>  at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>  at java.lang.reflect.Method.invoke(Method.java:498)
> >>  at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> >>  at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> >>  at
> >>
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> >>  at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> >>  at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> >>  at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >>  at
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> >>  at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> >>  at java.security.AccessController.doPrivileged(Native Method)
> >>  at javax.security.auth.Subject.doAs(Subject.java:422)
> >>  at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> >>  at
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>  at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> >> execution failed.
> >>  at
> >>
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> >>  at
> >>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> >>  ... 22 more
> >> Caused by: java.lang.Exception: Exception while creating
> >> StreamOperatorStateContext.
> >>  at
> >>
>