
To clarify things the code causing the issue is here, nothing clever. The code 
fails at the line in bold. The Long index values are set earlier in sequence 

val scaledReadings : DataStream[(Int,Long, Double, Double)] = maxChannelReading
      .map { in => <>(s"scaledReadings $in")
        (in._1, in._2, in._3/in._4 + 2.0D, in._3) }

 val logRatioWindow: DataStream[(Int,Long, Int, Double)] = scaledReadings
      .countWindow(100, 99)
      .process(new logRatioWindowFunction() )


class logRatioWindowFunction extends ProcessWindowFunction[(Int, Long, Double, 
Double), (Int, Long, Int, Double),, 
GlobalWindow] {

  def process(key: Tuple, context: logRatioWindowFunction.this.Context, input: 
Iterable[(Int, Long, Double, Double)], out: Collector[(Int, Long, Int, 
Double)]) = 

    val a: Array[(Int, Long, Double, Double)] = input toArray;
    val ch = a(0)._1
    val s = a(0)._2
    val l = input.size

    if (l < 100) <>(s"Log ratio window length 
$l on channel $ch at sample $s")

    for (i <- 1 to a.size - 1) assert (a(i)._2 == a(i-1)._2+1, 
"logRatioWindowFunction:Failure non-monotonic indexes "+  a(i-1)._2 + " and " + 
a(i)._2 )

    if (l == 100) {
      for (i <- 0 to l-2) {
        val v: Int = rint(100 * log (E + a(i+1)._3 / a(i)._3)) toInt;
        assert(v > 0, "Bad minhash in medianLogRatioWindowFunction " + v)
        Job.LOG.debug("logRatioWindowFunction [" + a(i+1)._1 + ", " + a(i+1)._2 
+ ", " +  v+ ", " +  a(i+1)._4 +"]")
        out.collect(scala.Tuple4(a(i+1)._1, a(i+1)._2, v, a(i+1)._4))
      Job.LOG.debug("logRatioWindowFunction [" + a.head._1 + ", " + a.head._2 + 
" ... " + a.last._2 +"] collected")


> On 17 Jul 2018, at 00:15, Martin, Nick < 
> <>> wrote:
> Is value(index-1) stored in Keyed State, or just a local variable inside the 
> operator?
> -----Original Message-----
> From: Nicholas Walton [ <>] 
> Sent: Monday, July 16, 2018 1:33 PM
> To: <>
> Subject: Parallelism and keyed streams
> I have a stream of tuples <channel: Int, index: Long, value: Double> , which 
> I form into a keyedStream using keyBy on channel. I then need to process each 
> channel in parallel. Each parallel stream must be processed in strict 
> sequential order by index to calculate the ratios 
> value(index)/value(index-1). If I set parallelism to 1 all is well, each 
> channel is processed in order of index 1,2,3,,4…
> My problem is when I set parallelism to a value greater than 1 each channel’s 
> keyedStream  appears to be split across multiple processes. So a channel may 
> be processed wrongly for example  as value(2), value(5), Value(6) , 
> value(9)…..
> The number of channels N is unknown. So how do I rig up N processing streams 
> with an unknown parallelism so that each stream processes each channel by 
> strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)
> Thanks in advance
> NIck Walton
> ------------------------------------------------------------------------------
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
> Thank you. 
> *********************

Reply via email to