This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/master by this push: new 3c3e250 camel-knative: support for cloud events specs v0.2 #376 3c3e250 is described below commit 3c3e2502808c8b93f728c1d3ae5e94b8ad110741 Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Fri Feb 1 17:03:06 2019 +0100 camel-knative: support for cloud events specs v0.2 #376 --- .../camel/component/knative/KnativeComponent.java | 30 +- .../component/knative/KnativeConfiguration.java | 22 ++ .../camel/component/knative/KnativeEndpoint.java | 87 +---- .../camel/component/knative/KnativeSupport.java | 13 - .../knative/ce/CloudEventsProcessors.java | 65 ++++ .../org/apache/camel/component/knative/ce/V01.java | 102 ++++++ .../org/apache/camel/component/knative/ce/V02.java | 102 ++++++ .../component/knative/CloudEventsV01Test.java | 349 +++++++++++++++++++++ .../component/knative/CloudEventsV02Test.java | 349 +++++++++++++++++++++ 9 files changed, 1019 insertions(+), 100 deletions(-) diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java index 264f644..60b7835 100644 --- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java +++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java @@ -30,8 +30,6 @@ public class KnativeComponent extends DefaultComponent { private final KnativeConfiguration configuration; private String environmentPath; - private boolean jsonSerializationEnabled; - public KnativeComponent() { this(null); } @@ -74,6 +72,22 @@ public class KnativeComponent extends DefaultComponent { configuration.setEnvironment(environment); } + public boolean isJsonSerializationEnabled() { + return configuration.isJsonSerializationEnabled(); + } + + public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) { + configuration.setJsonSerializationEnabled(jsonSerializationEnabled); + } + + public String getCloudEventsSpecVersion() { + return configuration.getCloudEventsSpecVersion(); + } + + public void setCloudEventsSpecVersion(String cloudEventSpecVersion) { + configuration.setCloudEventsSpecVersion(cloudEventSpecVersion); + } + // ************************ // // @@ -105,11 +119,11 @@ public class KnativeComponent extends DefaultComponent { String envConfig = System.getenv(CONFIGURATION_ENV_VARIABLE); if (environmentPath != null) { conf.setEnvironment( - KnativeEnvironment.mandatoryLoadFromResource(getCamelContext(), this.environmentPath) + KnativeEnvironment.mandatoryLoadFromResource(getCamelContext(), this.environmentPath) ); } else if (envConfig != null) { conf.setEnvironment( - KnativeEnvironment.mandatoryLoadFromSerializedString(getCamelContext(), envConfig) + KnativeEnvironment.mandatoryLoadFromSerializedString(getCamelContext(), envConfig) ); } else { throw new IllegalStateException("Cannot load Knative configuration from file or env variable"); @@ -118,12 +132,4 @@ public class KnativeComponent extends DefaultComponent { return conf; } - - public boolean isJsonSerializationEnabled() { - return jsonSerializationEnabled; - } - - public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) { - this.jsonSerializationEnabled = jsonSerializationEnabled; - } } diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java index cc3fa65..967f7dd 100644 --- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java +++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java @@ -25,6 +25,12 @@ public class KnativeConfiguration implements Cloneable { @Metadata(required = "true") private KnativeEnvironment environment; + @UriParam(defaultValue = "false") + private boolean jsonSerializationEnabled; + + @UriParam(defaultValue = "0.1", enums = "0.1,0.2") + private String cloudEventsSpecVersion = "0.1"; + public KnativeConfiguration() { } @@ -45,6 +51,22 @@ public class KnativeConfiguration implements Cloneable { this.environment = environment; } + public boolean isJsonSerializationEnabled() { + return jsonSerializationEnabled; + } + + public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) { + this.jsonSerializationEnabled = jsonSerializationEnabled; + } + + public String getCloudEventsSpecVersion() { + return cloudEventsSpecVersion; + } + + public void setCloudEventsSpecVersion(String cloudEventsSpecVersion) { + this.cloudEventsSpecVersion = cloudEventsSpecVersion; + } + // ************************ // // Cloneable diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index 89c7f3b..cceb218 100644 --- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -16,15 +16,17 @@ */ package org.apache.camel.component.knative; +import java.util.HashMap; +import java.util.Map; + import org.apache.camel.CamelContext; import org.apache.camel.Consumer; import org.apache.camel.DelegateEndpoint; import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.cloud.ServiceDefinition; +import org.apache.camel.component.knative.ce.CloudEventsProcessors; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.processor.Pipeline; import org.apache.camel.spi.Metadata; @@ -35,16 +37,6 @@ import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.StringHelper; import org.apache.camel.util.URISupport; -import org.apache.commons.lang3.StringUtils; - -import java.io.InputStream; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.camel.util.ObjectHelper.ifNotEmpty; @UriEndpoint( @@ -107,73 +99,18 @@ public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint @Override public Producer createProducer() throws Exception { - return new KnativeProducer( - this, - exchange -> { - final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); - final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); - final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault()); - final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created); - final Map<String, Object> headers = exchange.getIn().getHeaders(); - - headers.putIfAbsent("CE-CloudEventsVersion", "0.1"); - headers.putIfAbsent("CE-EventType", eventType); - headers.putIfAbsent("CE-EventID", exchange.getExchangeId()); - headers.putIfAbsent("CE-EventTime", eventTime); - headers.putIfAbsent("CE-Source", getEndpointUri()); - headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); - - // Always remove host so it's always computed from the URL and not inherited from the exchange - headers.remove("Host"); - }, - new KnativeConversionProcessor(getComponent().isJsonSerializationEnabled()), - endpoint.createProducer() - ); + final String version = configuration.getCloudEventsSpecVersion(); + final Processor ceProcessor = CloudEventsProcessors.forSpecversion(version).producerProcessor(this); + final Processor ceConverter = new KnativeConversionProcessor(configuration.isJsonSerializationEnabled()); + + return new KnativeProducer(this, ceProcessor, ceConverter, endpoint.createProducer()); } @Override public Consumer createConsumer(Processor processor) throws Exception { - final Processor pipeline = Pipeline.newInstance( - getCamelContext(), - exchange -> { - if (!KnativeSupport.hasStructuredContent(exchange)) { - // - // The event is not in the form of Structured Content Mode - // then leave it as it is. - // - // Note that this is true for http binding only. - // - // More info: - // - // https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode - // - return; - } - - try (InputStream is = exchange.getIn().getBody(InputStream.class)) { - final Message message = exchange.getIn(); - final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class); - - ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val)); - ifNotEmpty(ce.remove("data"), val -> message.setBody(val)); - - // - // Map extensions to standard camel headers - // - ifNotEmpty(ce.remove("extensions"), val -> { - if (val instanceof Map) { - ((Map<String, Object>) val).forEach(message::setHeader); - } - }); - - ce.forEach((key, val) -> { - message.setHeader("CE-" + StringUtils.capitalize(key), val); - }); - } - }, - processor - ); - + final String version = configuration.getCloudEventsSpecVersion(); + final Processor ceProcessor = CloudEventsProcessors.forSpecversion(version).consumerProcessor(this); + final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor); final Consumer consumer = endpoint.createConsumer(pipeline); configureConsumer(consumer); diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java index f84ac46..9c6c049 100644 --- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java +++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java @@ -16,15 +16,11 @@ */ package org.apache.camel.component.knative; -import java.io.UnsupportedEncodingException; -import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import java.util.Objects; import org.apache.camel.Exchange; -import org.apache.camel.util.CollectionHelper; -import org.apache.camel.util.URISupport; public final class KnativeSupport { private KnativeSupport() { @@ -47,13 +43,4 @@ public final class KnativeSupport { return answer; } - - public static String appendParametersToURI(String uri, String key, Object value, Object... keyVals) - throws UnsupportedEncodingException, URISyntaxException { - - return URISupport.appendParametersToURI( - uri, - CollectionHelper.mapOf(key, value, keyVals) - ); - } } diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java new file mode 100644 index 0000000..49ebee4d --- /dev/null +++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative.ce; + +import java.util.function.Function; + +import org.apache.camel.Processor; +import org.apache.camel.component.knative.KnativeEndpoint; + +public enum CloudEventsProcessors { + v01("0.1", V01.PRODUCER, V01.CONSUMER), + v02("0.2", V02.PRODUCER, V02.CONSUMER); + + private final String version; + private final Function<KnativeEndpoint, Processor> producer; + private final Function<KnativeEndpoint, Processor> consumer; + + CloudEventsProcessors(String version, Function<KnativeEndpoint, Processor> producer, Function<KnativeEndpoint, Processor> consumer) { + this.version = version; + this.producer = producer; + this.consumer = consumer; + } + + public String getVersion() { + return version; + } + + public Processor producerProcessor(KnativeEndpoint endpoint) { + return this.producer.apply(endpoint); + } + + public Processor consumerProcessor(KnativeEndpoint endpoint) { + return this.consumer.apply(endpoint); + } + + // ************************** + // + // Helpers + // + // ************************** + + public static CloudEventsProcessors forSpecversion(String version) { + for (CloudEventsProcessors ce : CloudEventsProcessors.values()) { + if (ce.version.equals(version)) { + return ce; + } + } + + throw new IllegalArgumentException("Unable to find processors for spec version: " + version); + } +} diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java new file mode 100644 index 0000000..4505d8e --- /dev/null +++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative.ce; + +import java.io.InputStream; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.function.Function; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.knative.Knative; +import org.apache.camel.component.knative.KnativeEndpoint; +import org.apache.camel.component.knative.KnativeEnvironment; +import org.apache.camel.component.knative.KnativeSupport; +import org.apache.commons.lang3.StringUtils; + +import static org.apache.camel.util.ObjectHelper.ifNotEmpty; + +final class V01 { + private V01() { + } + + public static final Function<KnativeEndpoint, Processor> PRODUCER = (KnativeEndpoint endpoint) -> { + KnativeEnvironment.KnativeServiceDefinition service = endpoint.getService(); + String uri = endpoint.getEndpointUri(); + + return exchange -> { + final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); + final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); + final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault()); + final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created); + final Map<String, Object> headers = exchange.getIn().getHeaders(); + + headers.putIfAbsent("CE-CloudEventsVersion", "0.1"); + headers.putIfAbsent("CE-EventType", eventType); + headers.putIfAbsent("CE-EventID", exchange.getExchangeId()); + headers.putIfAbsent("CE-EventTime", eventTime); + headers.putIfAbsent("CE-Source", uri); + headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); + + // Always remove host so it's always computed from the URL and not inherited from the exchange + headers.remove("Host"); + }; + }; + + public static final Function<KnativeEndpoint, Processor> CONSUMER = (KnativeEndpoint endpoint) -> { + return exchange -> { + if (!KnativeSupport.hasStructuredContent(exchange)) { + // + // The event is not in the form of Structured Content Mode + // then leave it as it is. + // + // Note that this is true for http binding only. + // + // More info: + // + // https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode + // + return; + } + + try (InputStream is = exchange.getIn().getBody(InputStream.class)) { + final Message message = exchange.getIn(); + final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class); + + ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val)); + ifNotEmpty(ce.remove("data"), val -> message.setBody(val)); + + // + // Map extensions to standard camel headers + // + ifNotEmpty(ce.remove("extensions"), val -> { + if (val instanceof Map) { + ((Map<String, Object>) val).forEach(message::setHeader); + } + }); + + ce.forEach((key, val) -> { + message.setHeader("CE-" + StringUtils.capitalize(key), val); + }); + } + }; + }; +} diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java new file mode 100644 index 0000000..321c502 --- /dev/null +++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative.ce; + +import java.io.InputStream; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.function.Function; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.knative.Knative; +import org.apache.camel.component.knative.KnativeEndpoint; +import org.apache.camel.component.knative.KnativeEnvironment; +import org.apache.camel.component.knative.KnativeSupport; +import org.apache.commons.lang3.StringUtils; + +import static org.apache.camel.util.ObjectHelper.ifNotEmpty; + +final class V02 { + private V02() { + } + + public static final Function<KnativeEndpoint, Processor> PRODUCER = (KnativeEndpoint endpoint) -> { + KnativeEnvironment.KnativeServiceDefinition service = endpoint.getService(); + String uri = endpoint.getEndpointUri(); + + return exchange -> { + final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); + final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); + final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault()); + final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created); + final Map<String, Object> headers = exchange.getIn().getHeaders(); + + headers.putIfAbsent("ce-specversion", "0.2"); + headers.putIfAbsent("ce-type", eventType); + headers.putIfAbsent("ce-id", exchange.getExchangeId()); + headers.putIfAbsent("ce-time", eventTime); + headers.putIfAbsent("ce-source", uri); + headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); + + // Always remove host so it's always computed from the URL and not inherited from the exchange + headers.remove("Host"); + }; + }; + + public static final Function<KnativeEndpoint, Processor> CONSUMER = (KnativeEndpoint endpoint) -> { + return exchange -> { + if (!KnativeSupport.hasStructuredContent(exchange)) { + // + // The event is not in the form of Structured Content Mode + // then leave it as it is. + // + // Note that this is true for http binding only. + // + // More info: + // + // https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode + // + return; + } + + try (InputStream is = exchange.getIn().getBody(InputStream.class)) { + final Message message = exchange.getIn(); + final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class); + + ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val)); + ifNotEmpty(ce.remove("data"), val -> message.setBody(val)); + + // + // Map extensions to standard camel headers + // + ifNotEmpty(ce.remove("extensions"), val -> { + if (val instanceof Map) { + ((Map<String, Object>) val).forEach(message::setHeader); + } + }); + + ce.forEach((key, val) -> { + message.setHeader("ce-" + StringUtils.lowerCase(key), val); + }); + } + }; + }; +} diff --git a/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java new file mode 100644 index 0000000..f5a9ecc --- /dev/null +++ b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.cloud.ServiceDefinition; +import org.apache.camel.component.knative.ce.CloudEventsProcessors; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.AvailablePortFinder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.apache.camel.util.CollectionHelper.mapOf; + +public class CloudEventsV01Test { + + private CamelContext context; + + // ************************** + // + // Setup + // + // ************************** + + @BeforeEach + public void before() { + this.context = new DefaultCamelContext(); + } + + @AfterEach + public void after() throws Exception { + if (this.context != null) { + this.context.stop(); + } + } + + // ************************** + // + // Tests + // + // ************************** + + @Test + void testInvokeEndpoint() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + + KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "myEndpoint", + "localhost", + port, + mapOf( + ServiceDefinition.SERVICE_META_PATH, "/a/path", + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + )); + + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion()); + component.setEnvironment(env); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:source") + .to("knative:endpoint/myEndpoint"); + + fromF("netty4-http:http://localhost:%d/a/path", port) + .to("mock:ce"); + } + }); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); + mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); + mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID")); + mock.expectedBodiesReceived("test"); + + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setBody("test"); + } + ); + + mock.assertIsSatisfied(); + } + + @Test + void testConsumeStructuredContent() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + + KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "myEndpoint", + "localhost", + port, + mapOf( + ServiceDefinition.SERVICE_META_PATH, "/a/path", + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + )); + + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion()); + component.setEnvironment(env); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("knative:endpoint/myEndpoint") + .to("mock:ce"); + + from("direct:source") + .toF("netty4-http:http://localhost:%d/a/path", port); + } + }); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); + mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); + mock.expectedHeaderReceived("CE-EventID", "myEventID"); + mock.expectedHeaderReceived("CE-Source", "/somewhere"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); + mock.expectedBodiesReceived("test"); + + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE); + e.getIn().setBody(new ObjectMapper().writeValueAsString(mapOf( + "cloudEventsVersion", CloudEventsProcessors.v01.getVersion(), + "eventType", "org.apache.camel.event", + "eventID", "myEventID", + "eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()), + "source", "/somewhere", + "data", "test" + ))); + } + ); + + mock.assertIsSatisfied(); + } + + @Test + void testConsumeContent() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + + KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "myEndpoint", + "localhost", + port, + mapOf( + ServiceDefinition.SERVICE_META_PATH, "/a/path", + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + )); + + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion()); + component.setEnvironment(env); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("knative:endpoint/myEndpoint") + .to("mock:ce"); + + from("direct:source") + .toF("http4://localhost:%d/a/path", port); + } + }); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); + mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); + mock.expectedHeaderReceived("CE-EventID", "myEventID"); + mock.expectedHeaderReceived("CE-Source", "/somewhere"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); + mock.expectedBodiesReceived("test"); + + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setHeader(Exchange.CONTENT_TYPE, "text/plain"); + e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); + e.getIn().setHeader("CE-EventType", "org.apache.camel.event"); + e.getIn().setHeader("CE-EventID", "myEventID"); + e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getIn().setHeader("CE-Source", "/somewhere"); + e.getIn().setBody("test"); + } + ); + + mock.assertIsSatisfied(); + } + + @Test + void testConsumeContentWithFilter() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + + KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "ep1", + "localhost", + port, + mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain", + Knative.FILTER_HEADER_NAME, "CE-Source", + Knative.FILTER_HEADER_VALUE, "CE1" + )), + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "ep2", + "localhost", + port, + mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain", + Knative.FILTER_HEADER_NAME, "CE-Source", + Knative.FILTER_HEADER_VALUE, "CE2" + )) + )); + + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion()); + component.setEnvironment(env); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("knative:endpoint/ep1") + .convertBodyTo(String.class) + .to("log:ce1?showAll=true&multiline=true") + .to("mock:ce1"); + from("knative:endpoint/ep2") + .convertBodyTo(String.class) + .to("log:ce2?showAll=true&multiline=true") + .to("mock:ce2"); + + from("direct:source") + .setBody() + .constant("test") + .setHeader(Exchange.HTTP_METHOD) + .constant("POST") + .setHeader(Exchange.HTTP_QUERY) + .simple("filter.headerName=CE-Source&filter.headerValue=${header.FilterVal}") + .toD("http4://localhost:" + port); + } + }); + + context.start(); + + MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); + mock1.expectedMessageCount(1); + mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); + mock1.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); + mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); + mock1.expectedHeaderReceived("CE-EventID", "myEventID1"); + mock1.expectedHeaderReceived("CE-Source", "CE1"); + mock1.expectedBodiesReceived("test"); + + MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); + mock2.expectedMessageCount(1); + mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); + mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); + mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); + mock2.expectedHeaderReceived("CE-EventID", "myEventID2"); + mock2.expectedHeaderReceived("CE-Source", "CE2"); + mock2.expectedBodiesReceived("test"); + + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setHeader("FilterVal", "CE1"); + e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); + e.getIn().setHeader("CE-EventType", "org.apache.camel.event"); + e.getIn().setHeader("CE-EventID", "myEventID1"); + e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getIn().setHeader("CE-Source", "CE1"); + } + ); + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setHeader("FilterVal", "CE2"); + e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); + e.getIn().setHeader("CE-EventType", "org.apache.camel.event"); + e.getIn().setHeader("CE-EventID", "myEventID2"); + e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getIn().setHeader("CE-Source", "CE2"); + } + ); + + mock1.assertIsSatisfied(); + mock2.assertIsSatisfied(); + } +} diff --git a/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java new file mode 100644 index 0000000..3c32915 --- /dev/null +++ b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.cloud.ServiceDefinition; +import org.apache.camel.component.knative.ce.CloudEventsProcessors; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.AvailablePortFinder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.apache.camel.util.CollectionHelper.mapOf; + +public class CloudEventsV02Test { + + private CamelContext context; + + // ************************** + // + // Setup + // + // ************************** + + @BeforeEach + public void before() { + this.context = new DefaultCamelContext(); + } + + @AfterEach + public void after() throws Exception { + if (this.context != null) { + this.context.stop(); + } + } + + // ************************** + // + // Tests + // + // ************************** + + @Test + void testInvokeEndpoint() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + + KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "myEndpoint", + "localhost", + port, + mapOf( + ServiceDefinition.SERVICE_META_PATH, "/a/path", + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + )); + + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion()); + component.setEnvironment(env); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:source") + .to("knative:endpoint/myEndpoint"); + + fromF("netty4-http:http://localhost:%d/a/path", port) + .to("mock:ce"); + } + }); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); + mock.expectedHeaderReceived("ce-type", "org.apache.camel.event"); + mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id")); + mock.expectedBodiesReceived("test"); + + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setBody("test"); + } + ); + + mock.assertIsSatisfied(); + } + + @Test + void testConsumeStructuredContent() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + + KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "myEndpoint", + "localhost", + port, + mapOf( + ServiceDefinition.SERVICE_META_PATH, "/a/path", + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + )); + + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion()); + component.setEnvironment(env); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("knative:endpoint/myEndpoint") + .to("mock:ce"); + + from("direct:source") + .toF("netty4-http:http://localhost:%d/a/path", port); + } + }); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); + mock.expectedHeaderReceived("ce-type", "org.apache.camel.event"); + mock.expectedHeaderReceived("ce-id", "myEventID"); + mock.expectedHeaderReceived("ce-source", "/somewhere"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); + mock.expectedBodiesReceived("test"); + + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE); + e.getIn().setBody(new ObjectMapper().writeValueAsString(mapOf( + "specversion", CloudEventsProcessors.v02.getVersion(), + "type", "org.apache.camel.event", + "id", "myEventID", + "time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()), + "source", "/somewhere", + "data", "test" + ))); + } + ); + + mock.assertIsSatisfied(); + } + + @Test + void testConsumeContent() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + + KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "myEndpoint", + "localhost", + port, + mapOf( + ServiceDefinition.SERVICE_META_PATH, "/a/path", + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + )); + + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion()); + component.setEnvironment(env); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("knative:endpoint/myEndpoint") + .to("mock:ce"); + + from("direct:source") + .toF("http4://localhost:%d/a/path", port); + } + }); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); + mock.expectedHeaderReceived("ce-type", "org.apache.camel.event"); + mock.expectedHeaderReceived("ce-id", "myEventID"); + mock.expectedHeaderReceived("ce-source", "/somewhere"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); + mock.expectedBodiesReceived("test"); + + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setHeader(Exchange.CONTENT_TYPE, "text/plain"); + e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion()); + e.getIn().setHeader("ce-type", "org.apache.camel.event"); + e.getIn().setHeader("ce-id", "myEventID"); + e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getIn().setHeader("ce-source", "/somewhere"); + e.getIn().setBody("test"); + } + ); + + mock.assertIsSatisfied(); + } + + @Test + void testConsumeContentWithFilter() throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + + KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "ep1", + "localhost", + port, + mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain", + Knative.FILTER_HEADER_NAME, "ce-source", + Knative.FILTER_HEADER_VALUE, "CE1" + )), + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "ep2", + "localhost", + port, + mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain", + Knative.FILTER_HEADER_NAME, "ce-source", + Knative.FILTER_HEADER_VALUE, "CE2" + )) + )); + + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion()); + component.setEnvironment(env); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("knative:endpoint/ep1") + .convertBodyTo(String.class) + .to("log:ce1?showAll=true&multiline=true") + .to("mock:ce1"); + from("knative:endpoint/ep2") + .convertBodyTo(String.class) + .to("log:ce2?showAll=true&multiline=true") + .to("mock:ce2"); + + from("direct:source") + .setBody() + .constant("test") + .setHeader(Exchange.HTTP_METHOD) + .constant("POST") + .setHeader(Exchange.HTTP_QUERY) + .simple("filter.headerName=ce-source&filter.headerValue=${header.FilterVal}") + .toD("http4://localhost:" + port); + } + }); + + context.start(); + + MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); + mock1.expectedMessageCount(1); + mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); + mock1.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); + mock1.expectedHeaderReceived("ce-type", "org.apache.camel.event"); + mock1.expectedHeaderReceived("ce-id", "myEventID1"); + mock1.expectedHeaderReceived("ce-source", "CE1"); + mock1.expectedBodiesReceived("test"); + + MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); + mock2.expectedMessageCount(1); + mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); + mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); + mock2.expectedHeaderReceived("ce-type", "org.apache.camel.event"); + mock2.expectedHeaderReceived("ce-id", "myEventID2"); + mock2.expectedHeaderReceived("ce-source", "CE2"); + mock2.expectedBodiesReceived("test"); + + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setHeader("FilterVal", "CE1"); + e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion()); + e.getIn().setHeader("ce-type", "org.apache.camel.event"); + e.getIn().setHeader("ce-id", "myEventID1"); + e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getIn().setHeader("ce-source", "CE1"); + } + ); + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setHeader("FilterVal", "CE2"); + e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion()); + e.getIn().setHeader("ce-type", "org.apache.camel.event"); + e.getIn().setHeader("ce-id", "myEventID2"); + e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getIn().setHeader("ce-source", "CE2"); + } + ); + + mock1.assertIsSatisfied(); + mock2.assertIsSatisfied(); + } +}