Sync commit to kafka 0.10

2017-08-30 Thread krot.vyacheslav
Hi,

I'm looking for a way to make a sync commit of offsets to kafka 0.10?
commitAsync works well, but I'd like to proceed to next job only after
successful commit, a small additional latency is not an issue for my
usecase. I know I can store offsets somewhere else, but builtin kafka offset
storage looks good and easy to use. What is the correct way to do this? 
I tried first approach that came up to my mind, like this:
val latch = new 
commitAsync(offsetRanges, new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
latch.countDown();
}
});
latch.await();

but this does not work - I get very wierd effects - new data from kafka is
read with great delays.

Is there an elegant right way to do this?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Storing a lot of state with updateStateByKey

2015-05-14 Thread krot.vyacheslav
Hi all, 
I'm a complete newbie to spark and spark streaming, so the question may seem
obvious, sorry for that.
It is okay to store Seq[Data] in state when using 'updateStateByKey'? I have
a function with signature
def saveState(values: Seq[Msg], value: Option[Iterable[Msg]]):
Option[Iterable[Msg]] that stores about 200k records in iterable. I've seen
most examples having some kind of accumulators in state, so I'm wondering if
having a collection is a normal usecase.

Maybe you can suggest how to solve my task without this mutable state. I
have a kafka topic that generates about 20k messages/sec. I need to group
messages based on some key and send groups to another topic. Groups should
be sent when number of messages exceeds some count N OR when predefined time
T has passed since the first message in a group has arrived, no matter if
group messages count is less then N. First of all window functions come into
mind, but the problem is that I need to send group as soon N messages
arrived, not wait until window duration has passed.
I decided to set batch size to 0.5 sec and T is about 3 sec. on each batch I
first take groups that have enough messages and send them. The rest of the
messages I put to shared state. In updateStateByKey I have all messages that
have not been set yet - I again try to group them and send those groups that
have enough messages. This way I check messages with latency 0.5 sec instead
of 3s.

Update function:
 def saveState(values: Seq[Iterable[Msg]], value: Option[(Iterable[Msg],
Iterable[Msg])]): Option[(Iterable[Msg], Iterable[Msg])] = {
  // when does values size is greater than 1? I didn't get into that
yet.
  if (values.size  1){
throw new NullPointerException
  }

  // notSent - those that have not been sent yet
  val (notSent, _) = value.getOrElse((List(), List()))
  // discard sent

  // here goes more complex logic with verification if message should be
sent based on its arrival time
  // for now it is simplified
  val all = notSent ++ values(0)
  val result = all.groupBy(_.key)
.partition(ifNotSend _)

  Some((result._1.values.flatten, result._2.values.flatten))
}

The whole code:
val batchSize = 5

// will persist speed-up anything here?
val grouped = inputStream.map(msg = (msg.key,
msg)).groupByKey().persist()

def ifSend(x: (Int, Iterable[_])) = x._2.size = batchSize
def ifNotSend(x: (Int, Iterable[_])) = !ifSend(x)

val readyToSend = grouped.filter(ifSend _)
readyToSend.foreachRDD(rdd = {
  // send to kafka
})
// this should not be sent immediately but combined with those
val incomplete = grouped.filter(ifNotSend _)

/**
 * returns (Seq[Msg], Seq[Msg])
 * _1 - messages that should not be sent and preserved for next batch
execution
 * _2 - messages that
 *
 */
def saveState(values: Seq[Iterable[Msg]], value: Option[(Iterable[Msg],
Iterable[Msg])]): Option[(Iterable[Msg], Iterable[Msg])] = {
  if (values.size  1){
throw new NullPointerException
  }

  // notSent - those that have not been sent yet
  val (notSent, _) = value.getOrElse((List(), List()))
  // discard sent

  // here goes more complex logic with verification if message should be
sent based on its arrival time
  // for now it is simplified
  val all = notSent ++ values(0)
  val result = all.groupBy(_.key)
.partition(ifNotSend _)

  Some((result._1.values.flatten, result._2.values.flatten))
}

val state = incomplete.updateStateByKey(saveState _)
state.foreachRDD(rdd = {
  val messagesToSend = rdd.filter(x = x._2._2.nonEmpty)
.map(x = x._2._2)
  println(messagesToSend.collect().flatten.mkString(,))
  println()
})


Maybe you could suggest a better/more efficient solution?

Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Storing-a-lot-of-state-with-updateStateByKey-tp22890.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org