Good morning all.
I'm VERY new to camel so I'm still trying to get a grip on all the various
components so please bear with me.
I'm using quarkus/camel and have a route where I pull a message off of a kafka
topic (this part works perfectly btw) but then I want to send the string on to
a REST service and based on the response I get back from the service (i.e. 200
- Ok - Go on to next message, 400 - Bad Request - throw message in an error
queue or 503 - Service unavailable - Wait x amount of time and do y amounts of
retries before stopping the route completely).
My first attempt was to do all the REST calls in a .proccess java class.
But I now saw that you can do a .to(bean:xxx) and basically also call a java
class to do all the required code etc.
So my question is: what is the more "correct" way to do this (especially with
regards to getting application.properties values to the java class and then
sending/handeling the responses from the REST service. Would it be better to do
all the error/wait handeling in the java class or rather build it into the
route itself (with .errorHandler etc?)
Here is my current working route code:
@ApplicationScoped
public class EnrollementEventRoute extends RouteBuilder {
private EnrollmentEventRestSender eers;
@ConfigProperty(name = "kafka.topic.academia.registration")
String registrationTopicName;
@ConfigProperty(name = "kafka.academia.broker")
String kafkaBroker;
@ConfigProperty(name = "kafka.academia.config.clientId")
String kafkaClientId;
@ConfigProperty(name = "kafka.academia.registration.autoOffsetReset",
defaultValue = "latest")
String offset;
@ConfigProperty(name = "kafka.academia.config.groupId")
String groupId;
@ConfigProperty(name = "kafka.academia.config.keyDeserializer")
String keyDeserializer;
@ConfigProperty(name = "kafka.academia.config.valueDeserializer")
String valueDeserializer;
@ConfigProperty(name = "fms.registration.restservice.endpoint")
String restEndpoint;
@Override
public void configure() throws Exception {
eers = new EnrollmentEventRestSender(restEndpoint);
from(kafka(registrationTopicName)
.brokers(kafkaBroker)
.clientId(kafkaClientId)
.groupId(groupId)
.keyDeserializer(keyDeserializer)
.valueDeserializer(valueDeserializer)
.autoOffsetReset(offset))
.log("Registration Event received: ${body}")
.process(eers);
}
And then here is the code in the EnrollmentEventRestSender class:
@ApplicationScoped
public class EnrollmentEventRestSender implements Processor {
private String restEndpoint;
public EnrollmentEventRestSender() { //Dummy constructor needed.
};
public EnrollmentEventRestSender(String url) {
this.restEndpoint = url;
}
@Override
public void process(Exchange exchange) throws Exception {
try {
CloseableHttpClient client = HttpClients.createDefault();
System.out.println("Got endpoint of: " + restEndpoint);
HttpPost httpPost = new HttpPost(restEndpoint);
String json = (String) exchange.getIn().getBody();
System.out.println("Got JSON in Exchange: " + json);
StringEntity entity = new StringEntity(json);
httpPost.setEntity(entity);
// httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "text/plain; charset=utf-8");
CloseableHttpResponse response = client.execute(httpPost);
System.out.println("Got Response of: " +
response.getStatusLine().getStatusCode());
if (!(response.getStatusLine().getStatusCode()==200)) { // Something
wrong
InputStream is = response.getEntity().getContent();
BufferedReader rd = new BufferedReader(new InputStreamReader(is));
StringBuilder errReply = new StringBuilder();
String responseLine = null;
while ((responseLine = rd.readLine()) != null) {
errReply.append(responseLine.trim());
}
rd.close();
is.close();
System.out.println(errReply);
}
client.close();
}
catch (Exception ex ) {
ex.printStackTrace();
}
}
}
[https://www.sun.ac.za/productionfooter/email/ProductionFooter.jpg]<https://www.sun.ac.za/english/about-us/strategic-documents>
The integrity and confidentiality of this email are governed by these terms.
Disclaimer<https://www.sun.ac.za/emaildisclaimer/default.aspx>
Die integriteit en vertroulikheid van hierdie e-pos word deur die volgende
bepalings bereƫl.
Vrywaringsklousule<https://www.sun.ac.za/emaildisclaimer/default.aspx>