Re: [akka-user] Re: akka kafka stream parallel processing

2017-05-16 Thread Shannon Ma
hi, In the below code, my outmsg from the result, in one case, i need to flat it to multiple output messages, i am stuck as where/how can i do it, really appreciate if someone can help. Thanks Shannon Source,

Re: [akka-user] Re: akka kafka stream parallel processing

2017-05-04 Thread Shannon Ma
Hi, Some more newbie questions, so to keep my testing/learning, i want to use actor to process message. So in my map() i invoke the actor. 1, is this a good use of actor here? 2, Await.result is blocking, in this case is it a good idea to do non-blocking (or possible)? Thanks Shannon

Re: [akka-user] Re: akka kafka stream parallel processing

2017-05-04 Thread Shannon Ma
Got it working with this, still try to understand the last part (runForEach) s.mapAsyncUnordered(3, new Function,NotUsed>>, CompletionStage>() { public CompletionStage apply( Pair

Re: [akka-user] Re: akka kafka stream parallel processing

2017-05-03 Thread Shannon Ma
Many examples using lambda expression, like this Consumer.Control c = Consumer.committablePartitionedSource(consumerSettings, Subscriptions. topics("topic1")) .map(pair -> pair.second().via(business()).toMat(Sink.ignore(), Keep.both()) .run(materializer)) .mapAsyncUnordered(maxPartitions, (pair)

Re: [akka-user] Re: akka kafka stream parallel processing

2017-05-02 Thread Shannon Ma
The above code does not seem to consume any message, from the log 12:04| INFO | AbstractCoordinator.java 349 | Successfully joined group part1 with generation 1 12:04| INFO | ConsumerCoordinator.java 225 | Setting newly assigned partitions [sanitation-1, sanitation-0, sanitation-2] for group

Re: [akka-user] Re: akka kafka stream parallel processing

2017-05-02 Thread Shannon Ma
Thanks you are right, once i used all javadsl classes it gets compiled. I got the simple app works, i like to try how the partition works. The inner apply() is similar to the simple logic (no partition), now what i am not sure if 1, what should completionStage contain, i am putting

[akka-user] Re: akka kafka stream parallel processing

2017-05-01 Thread Shannon Ma
---made some changes, still with mapAsync, in the apply() the input is Source not Message, how can i get the message from it, i cannot find anything. TopicPartition par = new TopicPartition("test", 3); Source

[akka-user] Re: akka kafka stream parallel processing

2017-05-01 Thread Shannon Ma
In my testing i got this simple source->map->sink working, Source, Control> s = Consumer.committableSource(consumerSettings, Subscriptions.topics("sanitation")).asJava(); s.map(new

[akka-user] akka kafka stream parallel processing

2017-05-01 Thread Shannon Ma
Hi, How does akka kafka stream (or how can i do) handle parallel processing based on topic partitions, if i run multiple threads within an instance or run multiple instances, how does it rebalance and/or failover if one thread dies or if i add/remove instances. Thanks Shannon --

Re: [akka-user] akka stream kafka newbie question

2017-05-01 Thread Shannon Ma
After i cleaned up my maven repository, it is working now, though i did not narrow down to which lib/jar has the conflict. Thanks Shannon -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html

Re: [akka-user] akka stream kafka newbie question

2017-04-28 Thread Shannon Ma
> > > In your maven pomfile, if you want to explicitly list Akka as well, > you could then see in your IDE which versions that kaka-stream-kafka pulls > in and add those versions. > > -- > Johan > Akka Team > > On Fri, Apr 28, 2017 at 4:38 PM, Shannon

Re: [akka-user] akka stream kafka newbie question

2017-04-28 Thread Shannon Ma
With 2.12, i had to update the code, return new ProducerMessage.Message( new ProducerRecord("akkatest", msg.record().key(), msg.record().value()), msg.committableOffset()); but getting Exception in

Re: [akka-user] akka stream kafka newbie question

2017-04-28 Thread Shannon Ma
My Kafka version : 0.10.1.1, i should use akka-stream-kafka_2.12, right? -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives:

Re: [akka-user] akka stream kafka newbie question

2017-04-28 Thread Shannon Ma
Looks like it is the message type, from another example when i do this, it passes compilation. s.map(new Function, ProducerMessage.Message>() { public

Re: [akka-user] akka stream kafka newbie question

2017-04-28 Thread Shannon Ma
Here is my code final ActorSystem system = ActorSystem.create(); ActorMaterializer materializer = ActorMaterializer.create(system); final ConsumerSettings consumerSettings = ConsumerSettings.create(system, new SpecificAvroDeserializer(), new

Re: [akka-user] akka stream kafka newbie question

2017-04-28 Thread Shannon Ma
ve to do that explicitly: > > > someSource.runWith(sink, materializer) > > -- > Konrad `ktoso` Malawski > Akka <http://akka.io> @ Lightbend <http://lightbend.com> > > On 28 April 2017 at 08:36:43, Shannon Ma (shan...@gmail.com ) > wrote: > > Hi, > > I am

[akka-user] akka stream kafka newbie question

2017-04-27 Thread Shannon Ma
Hi, I am new to akka stream kafka, and am trying to follow some the examples to get start. I try to use Source.runWith(Sink.ignore()) or Soruce.runWith(Producer.commitableSink(producerSettings) and get this compilation error The method