This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 5a8524c0ce7199f512105497d0a2791fe5db997b Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Tue May 19 15:04:03 2020 +0200 Improve knative broker integration (add support for dynamic event type) #326 --- .../component/knative/http/KnativeHttpTest.java | 57 ++++++++++++++++++++++ .../camel/component/knative/KnativeComponent.java | 2 +- .../knative/ce/AbstractCloudEventProcessor.java | 7 ++- 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index a4a55a6..edb81cc 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -1580,5 +1580,62 @@ public class KnativeHttpTest { server.stop(); } } + + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testDynamicEventBridge(CloudEvent ce) throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + final KnativeHttpServer server = new KnativeHttpServer(context, port); + + configureKnativeComponent( + context, + ce, + event( + Knative.EndpointKind.sink, + "default", + "localhost", + port, + mapOf( + Knative.CONTENT_TYPE, "text/plain" + )), + sourceEvent( + "event.source", + mapOf( + Knative.CONTENT_TYPE, "text/plain" + )) + ); + + RouteBuilder.addRoutes(context, b -> { + b.from("knative:event/event.source") + .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("event.sink") + .to("knative:event"); + }); + + context.start(); + + try { + server.start(); + + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event.source") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .when() + .post() + .then() + .statusCode(204); + + HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("event.sink"); + assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { + server.stop(); + } + } } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java index 9cbfa6b..8b30ee6 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java @@ -215,7 +215,7 @@ public class KnativeComponent extends DefaultComponent { throw new IllegalArgumentException("Expecting URI in the form of: 'knative:type/name', got '" + uri + "'"); } - final String type = StringHelper.before(remaining, "/"); + final String type = ObjectHelper.supplyIfEmpty(StringHelper.before(remaining, "/"), () -> remaining); final String name = StringHelper.after(remaining, "/"); final KnativeConfiguration conf = getKnativeConfiguration(); diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java index d5a73da..8c9be63 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java @@ -93,12 +93,11 @@ abstract class AbstractCloudEventProcessor implements CloudEventProcessor { headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); // - // in case of events, the type of the event defined as URI param so we need + // in case of events, if the type of the event is defined as URI param so we need // to override it to avoid the event type be overridden by Messages's headers // - if (endpoint.getType() == Knative.Type.event) { - Object eventType = headers.get(CloudEvent.CAMEL_CLOUD_EVENT_TYPE); - + if (endpoint.getType() == Knative.Type.event && endpoint.getName() != null) { + final Object eventType = headers.get(CloudEvent.CAMEL_CLOUD_EVENT_TYPE); if (eventType != null) { logger.debug("Detected the presence of {} header with value {}: it will be ignored and replaced by value set as uri parameter {}", CloudEvent.CAMEL_CLOUD_EVENT_TYPE,