hi,
In the below code, my outmsg from the result, in one case, i need to flat
it to multiple output messages, i am stuck as where/how can i do it, really
appreciate if someone can help.
Thanks
Shannon
Source,
Hi,
Some more newbie questions, so to keep my testing/learning, i want to use
actor to process message. So in my map() i invoke the actor.
1, is this a good use of actor here?
2, Await.result is blocking, in this case is it a good idea to do
non-blocking (or possible)?
Thanks
Shannon
Got it working with this, still try to understand the last part (runForEach)
s.mapAsyncUnordered(3, new
Function,NotUsed>>,
CompletionStage>() {
public CompletionStage apply(
Pair
Many examples using lambda expression, like this
Consumer.Control c =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.
topics("topic1"))
.map(pair -> pair.second().via(business()).toMat(Sink.ignore(), Keep.both())
.run(materializer))
.mapAsyncUnordered(maxPartitions, (pair)
The above code does not seem to consume any message, from the log
12:04| INFO | AbstractCoordinator.java 349 | Successfully joined group
part1 with generation 1
12:04| INFO | ConsumerCoordinator.java 225 | Setting newly assigned
partitions [sanitation-1, sanitation-0, sanitation-2] for group
Thanks you are right, once i used all javadsl classes it gets compiled.
I got the simple app works, i like to try how the partition works. The
inner apply() is similar to the simple logic (no partition), now what i am
not sure if
1, what should completionStage contain, i am putting
---made some changes, still with mapAsync, in the apply() the input is
Source not Message, how can i get the message from it, i cannot find
anything.
TopicPartition par = new TopicPartition("test", 3);
Source
In my testing i got this simple source->map->sink working,
Source, Control> s =
Consumer.committableSource(consumerSettings,
Subscriptions.topics("sanitation")).asJava();
s.map(new
Hi,
How does akka kafka stream (or how can i do) handle parallel processing
based on topic partitions, if i run multiple threads within an instance or
run multiple instances, how does it rebalance and/or failover if one thread
dies or if i add/remove instances.
Thanks
Shannon
--
After i cleaned up my maven repository, it is working now, though i did not
narrow down to which lib/jar has the conflict.
Thanks
Shannon
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>
>
> In your maven pomfile, if you want to explicitly list Akka as well,
> you could then see in your IDE which versions that kaka-stream-kafka pulls
> in and add those versions.
>
> --
> Johan
> Akka Team
>
> On Fri, Apr 28, 2017 at 4:38 PM, Shannon
With 2.12, i had to update the code,
return new ProducerMessage.Message(
new ProducerRecord("akkatest",
msg.record().key(), msg.record().value()), msg.committableOffset());
but getting
Exception in
My Kafka version : 0.10.1.1, i should use akka-stream-kafka_2.12, right?
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives:
Looks like it is the message type, from another example when i do this, it
passes compilation.
s.map(new
Function,
ProducerMessage.Message>() {
public
Here is my code
final ActorSystem system = ActorSystem.create();
ActorMaterializer materializer = ActorMaterializer.create(system);
final ConsumerSettings consumerSettings =
ConsumerSettings.create(system, new SpecificAvroDeserializer(), new
ve to do that explicitly:
>
>
> someSource.runWith(sink, materializer)
>
> --
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 28 April 2017 at 08:36:43, Shannon Ma (shan...@gmail.com )
> wrote:
>
> Hi,
>
> I am
Hi,
I am new to akka stream kafka, and am trying to follow some the examples to
get start.
I try to use
Source.runWith(Sink.ignore()) or
Soruce.runWith(Producer.commitableSink(producerSettings)
and get this compilation error
The method
17 matches
Mail list logo