Martin,

To clarify things the code causing the issue is here, nothing clever. The code 
fails at the line in bold. The Long index values are set earlier in sequence 
1,2,3,4,5,6,7…...

val scaledReadings : DataStream[(Int,Long, Double, Double)] = maxChannelReading
      .keyBy(0)
      .map { in =>
        LOG.info <http://log.info/>(s"scaledReadings $in")
        (in._1, in._2, in._3/in._4 + 2.0D, in._3) }


 val logRatioWindow: DataStream[(Int,Long, Int, Double)] = scaledReadings
      .keyBy(0)
      .countWindow(100, 99)
      .process(new logRatioWindowFunction() )


and

class logRatioWindowFunction extends ProcessWindowFunction[(Int, Long, Double, 
Double), (Int, Long, Int, Double), org.apache.flink.api.java.tuple.Tuple, 
GlobalWindow] {

  def process(key: Tuple, context: logRatioWindowFunction.this.Context, input: 
Iterable[(Int, Long, Double, Double)], out: Collector[(Int, Long, Int, 
Double)]) = 
  {

    val a: Array[(Int, Long, Double, Double)] = input toArray;
    val ch = a(0)._1
    val s = a(0)._2
    val l = input.size

    if (l < 100) Job.LOG.info <http://job.log.info/>(s"Log ratio window length 
$l on channel $ch at sample $s")

    for (i <- 1 to a.size - 1) assert (a(i)._2 == a(i-1)._2+1, 
"logRatioWindowFunction:Failure non-monotonic indexes "+  a(i-1)._2 + " and " + 
a(i)._2 )

    if (l == 100) {
      for (i <- 0 to l-2) {
        val v: Int = rint(100 * log (E + a(i+1)._3 / a(i)._3)) toInt;
        assert(v > 0, "Bad minhash in medianLogRatioWindowFunction " + v)
        Job.LOG.debug("logRatioWindowFunction [" + a(i+1)._1 + ", " + a(i+1)._2 
+ ", " +  v+ ", " +  a(i+1)._4 +"]")
        out.collect(scala.Tuple4(a(i+1)._1, a(i+1)._2, v, a(i+1)._4))
      }
      Job.LOG.debug("logRatioWindowFunction [" + a.head._1 + ", " + a.head._2 + 
" ... " + a.last._2 +"] collected")
    }
  }

}


> On 17 Jul 2018, at 00:15, Martin, Nick <nick.mar...@orbitalatk.com 
> <mailto:nick.mar...@orbitalatk.com>> wrote:
> 
> Is value(index-1) stored in Keyed State, or just a local variable inside the 
> operator?
> 
> -----Original Message-----
> From: Nicholas Walton [mailto:nwal...@me.com <mailto:nwal...@me.com>] 
> Sent: Monday, July 16, 2018 1:33 PM
> To: user@flink.apache.org <mailto:user@flink.apache.org>
> Subject: Parallelism and keyed streams
> 
> I have a stream of tuples <channel: Int, index: Long, value: Double> , which 
> I form into a keyedStream using keyBy on channel. I then need to process each 
> channel in parallel. Each parallel stream must be processed in strict 
> sequential order by index to calculate the ratios 
> value(index)/value(index-1). If I set parallelism to 1 all is well, each 
> channel is processed in order of index 1,2,3,,4…
> 
> My problem is when I set parallelism to a value greater than 1 each channel’s 
> keyedStream  appears to be split across multiple processes. So a channel may 
> be processed wrongly for example  as value(2), value(5), Value(6) , 
> value(9)…..
> 
> The number of channels N is unknown. So how do I rig up N processing streams 
> with an unknown parallelism so that each stream processes each channel by 
> strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)
> 
> Thanks in advance
> 
> NIck Walton
> 
> 
> ------------------------------------------------------------------------------
> 
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
> Thank you. 
> *********************

Reply via email to