This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c86fe38266eb3181a9942993cecff65fdee1a511 Author: Jia Zhai <zhai...@apache.org> AuthorDate: Thu Jul 2 01:41:31 2020 -0500 [client authentication] add authentication client with oauth2 support (#7420) ### Motivation Pulsar supports authenticating clients using OAuth 2.0 access tokens. You can use tokens to identify a Pulsar client and associate with some "principal" (or "role") that is permitted to do some actions (eg: publish to a topic or consume from a topic). This module is to support Pulsar Client Authentication Plugin for OAuth 2.0 directly. Client side communicate with Oauth 2.0 server, then the client will get an `access token` from Oauth 2.0 server, and will pass this `access token` to Pulsar broker to do the authentication. So the Broker side could still use `org.apache.pulsar.broker.authentication.AuthenticationProviderToken`, also user can add their own `AuthenticationProvider` to work with this module. ### Modifications - add related code; - add related test; - add related doc. The init of this client authentication module would be like: ```java Authentication oauth2Authentication = AuthenticationFactoryOAuth2.clientCredentials( new URL("https://dev-kt-aa9ne.us.auth0.com/oauth/token"), new URL("file:///path/to/credential/file.json"), // key file path "https://dev-kt-aa9ne.us.auth0.com/api/v2/" ); PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://broker.example.com:6650/") .authentication(oauth2Authentication) .build(); ``` ### Verifying this change tests passed. (cherry picked from commit 768813ea595697e317a0ac5af53191e6cf832766) --- .../TokenAuthenticatedProducerConsumerTest.java | 143 ++++++++++++++++++ ...kenOauth2AuthenticatedProducerConsumerTest.java | 160 +++++++++++++++++++++ .../authentication/token/credentials_file.json | 4 + .../impl/auth/oauth2/AuthenticationDataOAuth2.java | 60 ++++++++ .../auth/oauth2/AuthenticationFactoryOAuth2.java | 47 ++++++ .../impl/auth/oauth2/AuthenticationOAuth2.java | 131 +++++++++++++++++ .../impl/auth/oauth2/ClientCredentialsFlow.java | 146 +++++++++++++++++++ .../pulsar/client/impl/auth/oauth2/Flow.java | 47 ++++++ .../pulsar/client/impl/auth/oauth2/FlowBase.java | 80 +++++++++++ .../pulsar/client/impl/auth/oauth2/KeyFile.java | 66 +++++++++ .../pulsar/client/impl/auth/oauth2/Readme.md | 94 ++++++++++++ .../client/impl/auth/oauth2/package-info.java | 19 +++ .../protocol/ClientCredentialsExchangeRequest.java | 42 ++++++ .../protocol/ClientCredentialsExchanger.java | 41 ++++++ .../oauth2/protocol/DefaultMetadataResolver.java | 105 ++++++++++++++ .../client/impl/auth/oauth2/protocol/Metadata.java | 54 +++++++ .../auth/oauth2/protocol/MetadataResolver.java | 28 ++++ .../impl/auth/oauth2/protocol/TokenClient.java | 121 ++++++++++++++++ .../impl/auth/oauth2/protocol/TokenError.java | 41 ++++++ .../oauth2/protocol/TokenExchangeException.java | 35 +++++ .../impl/auth/oauth2/protocol/TokenResult.java | 51 +++++++ .../impl/auth/oauth2/protocol/package-info.java | 19 +++ .../impl/auth/oauth2/AuthenticationOAuth2Test.java | 122 ++++++++++++++++ .../pulsar/client/impl/auth/oauth2/MockClock.java | 97 +++++++++++++ 24 files changed, 1753 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java new file mode 100644 index 0000000..874a34d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.mockito.Mockito.spy; + +import com.google.common.collect.Sets; +import java.net.URI; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Test Token authentication with: + * client: org.apache.pulsar.client.impl.auth.AuthenticationToken + * broker: org.apache.pulsar.broker.authentication.AuthenticationProviderToken + */ +public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class); + + // pre-create a public/private_key pair. Public key used for broker to verify client passed in token + private final String TOKEN_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAhHKgdY6arG7eE75bUPtznN5WjMu0sxLq7pI5Aaiw2Ijerbz33iO/Fdd2yJVuAZNDZPD/AVSaeliEh/BP+s2rN8KNuiywD+SlL1NGf2JDS5BvGT4Q8eHfDDRd/iY5zkK58wYwlke6C8fKCx10MTH9iYAJpzaaxs+Tu1RaatK+691aYSiMkYIfgbqAKmSCpK+48al/PkmENfuhzaTBPhCnEblhNvUhS5MjzBcAcGzecpEuVSxUzDtm8rU8DEQR6kkdXS1QnGHVNis/vgk8QzctkJKbtgDIaGzNUmDvTCyPZ8WLWSWJWb1oPxRZwpfXVP69ijU0Rme4/YkuHt6IEw6ANQIDAQAB"; + // admin token created based on private_key. + private final String ADMIN_TOKEN = "eyJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTYyNTEzNjQyMn0.DAfbUPZwQURgGvor4scO0NoqoyHkCulKZkhP7kksCWFvgx6B22iKuXGX42EFlFSRMWYYgIJXV7UZATCLCjJpn_ijrO6AWBmooib3f94OPoLDdkF3qXnqaLnvJtl8_sCoLCSghR_O3hQFgQW2GRjMDdfJgl2_HXCWuzedtI5cQJdbpfU0NU10nzo7RtrpCmUdgQYQEHegYOawLqQVvr53ZGjrZilBXY9HHz1mSlnwZGNGVNNdvRthBuGtXtfKgtfSDF5jLqABvK8TUpdNJybibeiOspdzuY19-wVt4eVXzNAGsP4V4Zs91MgIUYV5lWKnBUuVWalppkMWhRF4Jf-KWQ"; + + @BeforeMethod + @Override + protected void setup() throws Exception { + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(true); + + Set<String> superUserRoles = new HashSet<>(); + superUserRoles.add("admin"); + conf.setSuperUserRoles(superUserRoles); + + Set<String> providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + conf.setAuthenticationProviders(providers); + + conf.setClusterName("test"); + + // Set provider domain name + Properties properties = new Properties(); + properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY); + + conf.setProperties(properties); + super.init(); + } + + // setup both admin and pulsar client + protected final void clientSetup() throws Exception { + admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(AuthenticationFactory.token(ADMIN_TOKEN)) + .build()); + + pulsarClient = PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString()) + .statsInterval(0, TimeUnit.SECONDS) + .authentication(AuthenticationFactory.token(ADMIN_TOKEN)) + .build(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider(name = "batch") + public Object[][] codecProvider() { + return new Object[][] { { 0 }, { 1000 } }; + } + + public void testSyncProducerAndConsumer() throws Exception { + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic") + .subscriptionName("my-subscriber-name").subscribe(); + + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic"); + + Producer<byte[]> producer = producerBuilder.create(); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message<byte[]> msg = null; + Set<String> messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } + + @Test + public void testTokenProducerAndConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + clientSetup(); + + // test rest by admin + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test")); + + // test protocol by producer/consumer + testSyncProducerAndConsumer(); + + log.info("-- Exiting {} test --", methodName); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java new file mode 100644 index 0000000..b54d8c9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.mockito.Mockito.spy; + +import com.google.common.collect.Sets; +import java.net.URI; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Test Token authentication with: + * client: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 + * broker: org.apache.pulsar.broker.authentication.AuthenticationProviderToken + */ +public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(TokenOauth2AuthenticatedProducerConsumerTest.class); + + // public key in oauth2 server to verify the client passed in token. get from https://jwt.io/ + private final String TOKEN_TEST_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2tZd/4gJda3U2Pc3tpgRAN7JPGWx/Gn17v/0IiZlNNRbP/Mmf0Vc6G1qsnaRaWNWOR+t6/a6ekFHJMikQ1N2X6yfz4UjMc8/G2FDPRmWjA+GURzARjVhxc/BBEYGoD0Kwvbq/u9CZm2QjlKrYaLfg3AeB09j0btNrDJ8rBsNzU6AuzChRvXj9IdcE/A/4N/UQ+S9cJ4UXP6NJbToLwajQ5km+CnxdGE6nfB7LWHvOFHjn9C2Rb9e37CFlmeKmIVFkagFM0gbmGOb6bnGI8Bp/VNGV0APef4YaBvBTqwoZ1Z4aDHy5eRxXfAMdtBkBupmBXqL6bpd15XRYUbu/7ck9QIDAQAB"; + + private final String ADMIN_ROLE = "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients"; + + // Credentials File, which contains "client_id" and "client_secret" + private final String CREDENTIALS_FILE = "./src/test/resources/authentication/token/credentials_file.json"; + + @BeforeMethod + @Override + protected void setup() throws Exception { + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(true); + + Set<String> superUserRoles = new HashSet<>(); + superUserRoles.add(ADMIN_ROLE); + conf.setSuperUserRoles(superUserRoles); + + Set<String> providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + conf.setAuthenticationProviders(providers); + + conf.setClusterName("test"); + + // Set provider domain name + Properties properties = new Properties(); + properties.setProperty("tokenPublicKey", TOKEN_TEST_PUBLIC_KEY); + + conf.setProperties(properties); + super.init(); + } + + // setup both admin and pulsar client + protected final void clientSetup() throws Exception { + Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath(); + log.info("Credentials File path: {}", path.toString()); + + // AuthenticationOAuth2 + Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials( + new URL("https://dev-kt-aa9ne.us.auth0.com/oauth/token"), + new URL("file://" + path.toString()), // key file path + "https://dev-kt-aa9ne.us.auth0.com/api/v2/" + ); + + admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(authentication) + .build()); + + pulsarClient = PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString()) + .statsInterval(0, TimeUnit.SECONDS) + .authentication(authentication) + .build(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider(name = "batch") + public Object[][] codecProvider() { + return new Object[][] { { 0 }, { 1000 } }; + } + + public void testSyncProducerAndConsumer() throws Exception { + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic") + .subscriptionName("my-subscriber-name").subscribe(); + + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic"); + + Producer<byte[]> producer = producerBuilder.create(); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message<byte[]> msg = null; + Set<String> messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } + + @Test + public void testTokenProducerAndConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + clientSetup(); + + // test rest by admin + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test")); + + // test protocol by producer/consumer + testSyncProducerAndConsumer(); + + log.info("-- Exiting {} test --", methodName); + } + +} diff --git a/pulsar-broker/src/test/resources/authentication/token/credentials_file.json b/pulsar-broker/src/test/resources/authentication/token/credentials_file.json new file mode 100644 index 0000000..db1eccd --- /dev/null +++ b/pulsar-broker/src/test/resources/authentication/token/credentials_file.json @@ -0,0 +1,4 @@ +{ + "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x", + "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb" +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java new file mode 100644 index 0000000..59810f5 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import org.apache.pulsar.client.api.AuthenticationDataProvider; + +/** + * Provide OAuth 2.0 authentication data. + */ +class AuthenticationDataOAuth2 implements AuthenticationDataProvider { + public static final String HTTP_HEADER_NAME = "Authorization"; + + private final String accessToken; + private final Set<Map.Entry<String, String>> headers; + + public AuthenticationDataOAuth2(String accessToken) { + this.accessToken = accessToken; + this.headers = Collections.singletonMap(HTTP_HEADER_NAME, "Bearer " + accessToken).entrySet(); + } + + @Override + public boolean hasDataForHttp() { + return true; + } + + @Override + public Set<Map.Entry<String, String>> getHttpHeaders() { + return this.headers; + } + + @Override + public boolean hasDataFromCommand() { + return true; + } + + @Override + public String getCommandData() { + return this.accessToken; + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java new file mode 100644 index 0000000..54da5287d --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import java.net.URL; +import java.time.Clock; +import org.apache.pulsar.client.api.Authentication; + +/** + * Factory class that allows to create {@link Authentication} instances + * for OAuth 2.0 authentication methods. + */ +public final class AuthenticationFactoryOAuth2 { + + /** + * Authenticate with client credentials. + * + * @param issuerUrl the issuer URL + * @param credentialsUrl the credentials URL + * @param audience the audience identifier + * @return an Authentication object + */ + public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) { + ClientCredentialsFlow flow = ClientCredentialsFlow.builder() + .issuerUrl(issuerUrl) + .privateKey(credentialsUrl.toExternalForm()) + .audience(audience) + .build(); + return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java new file mode 100644 index 0000000..f7f41d0 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; +import java.io.IOException; +import java.time.Clock; +import java.time.Instant; +import java.util.Map; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.AuthenticationUtil; + +/** + * Pulsar client authentication provider based on OAuth 2.0. + */ +@Slf4j +public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticationParameterSupport { + + public static final String CONFIG_PARAM_TYPE = "type"; + public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials"; + public static final String AUTH_METHOD_NAME = "token"; + public static final double EXPIRY_ADJUSTMENT = 0.9; + private static final long serialVersionUID = 1L; + + final Clock clock; + Flow flow; + transient CachedToken cachedToken; + + public AuthenticationOAuth2() { + this.clock = Clock.systemDefaultZone(); + } + + AuthenticationOAuth2(Flow flow, Clock clock) { + this.flow = flow; + this.clock = clock; + } + + @Override + public String getAuthMethodName() { + return AUTH_METHOD_NAME; + } + + @Override + public void configure(String encodedAuthParamString) { + if (StringUtils.isBlank(encodedAuthParamString)) { + throw new IllegalArgumentException("No authentication parameters were provided"); + } + Map<String, String> params; + try { + params = AuthenticationUtil.configureFromJsonString(encodedAuthParamString); + } catch (IOException e) { + throw new IllegalArgumentException("Malformed authentication parameters", e); + } + + String type = params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS); + switch(type) { + case TYPE_CLIENT_CREDENTIALS: + this.flow = ClientCredentialsFlow.fromParameters(params); + break; + default: + throw new IllegalArgumentException("Unsupported authentication type: " + type); + } + } + + @Override + @Deprecated + public void configure(Map<String, String> authParams) { + throw new NotImplementedException("Deprecated; use EncodedAuthenticationParameterSupport"); + } + + @Override + public void start() throws PulsarClientException { + flow.initialize(); + } + + @Override + public synchronized AuthenticationDataProvider getAuthData() throws PulsarClientException { + if (this.cachedToken == null || this.cachedToken.isExpired()) { + TokenResult tr = this.flow.authenticate(); + this.cachedToken = new CachedToken(tr); + } + return this.cachedToken.getAuthData(); + } + + @Override + public void close() throws IOException { + flow.close(); + } + + @Data + class CachedToken { + private final TokenResult latest; + private final Instant expiresAt; + private final AuthenticationDataOAuth2 authData; + + public CachedToken(TokenResult latest) { + this.latest = latest; + int adjustedExpiresIn = (int) (latest.getExpiresIn() * EXPIRY_ADJUSTMENT); + this.expiresAt = AuthenticationOAuth2.this.clock.instant().plusSeconds(adjustedExpiresIn); + this.authData = new AuthenticationDataOAuth2(latest.getAccessToken()); + } + + public boolean isExpired() { + return AuthenticationOAuth2.this.clock.instant().isAfter(this.expiresAt); + } + } +} + diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java new file mode 100644 index 0000000..13bf0f5 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.entity.ContentType; +import org.apache.pulsar.client.api.PulsarClientException; + +/** + * Implementation of OAuth 2.0 Client Credentials flow. + * + * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0 RFC 6749, section 4.4</a> + */ +@Slf4j +class ClientCredentialsFlow extends FlowBase { + public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl"; + public static final String CONFIG_PARAM_AUDIENCE = "audience"; + public static final String CONFIG_PARAM_KEY_FILE = "privateKey"; + + private static final long serialVersionUID = 1L; + + private final String audience; + private final String privateKey; + + private transient ClientCredentialsExchanger exchanger; + + @Builder + public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey) { + super(issuerUrl); + this.audience = audience; + this.privateKey = privateKey; + } + + @Override + public void initialize() throws PulsarClientException { + super.initialize(); + assert this.metadata != null; + + URL tokenUrl = this.metadata.getTokenEndpoint(); + this.exchanger = new TokenClient(tokenUrl); + } + + public TokenResult authenticate() throws PulsarClientException { + // read the private key from storage + KeyFile keyFile; + try { + keyFile = loadPrivateKey(this.privateKey); + } catch (IOException e) { + throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage()); + } + + // request an access token using client credentials + ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder() + .clientId(keyFile.getClientId()) + .clientSecret(keyFile.getClientSecret()) + .audience(this.audience) + .build(); + TokenResult tr; + try { + tr = this.exchanger.exchangeClientCredentials(req); + } catch (TokenExchangeException | IOException e) { + throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: " + + e.getMessage()); + } + + return tr; + } + + @Override + public void close() { + exchanger.close(); + } + + /** + * Constructs a {@link ClientCredentialsFlow} from configuration parameters. + * @param params + * @return + */ + public static ClientCredentialsFlow fromParameters(Map<String, String> params) { + URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL); + String audience = parseParameterString(params, CONFIG_PARAM_AUDIENCE); + String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE); + return ClientCredentialsFlow.builder() + .issuerUrl(issuerUrl) + .audience(audience) + .privateKey(privateKeyUrl) + .build(); + } + + /** + * Loads the private key from the given URL. + * @param privateKeyURL + * @return + * @throws IOException + */ + private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException { + try { + URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection(); + + String protocol = urlConnection.getURL().getProtocol(); + String contentType = urlConnection.getContentType(); + if ("data".equals(protocol) && !ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) { + throw new IllegalArgumentException( + "Unsupported media type or encoding format: " + urlConnection.getContentType()); + } + KeyFile privateKey; + try (Reader r = new InputStreamReader((InputStream) urlConnection.getContent(), StandardCharsets.UTF_8)) { + privateKey = KeyFile.fromJson(r); + } + return privateKey; + } catch (URISyntaxException | InstantiationException | IllegalAccessException e) { + throw new IOException("Invalid privateKey format", e); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java new file mode 100644 index 0000000..b572325 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; +import java.io.Serializable; +import org.apache.pulsar.client.api.PulsarClientException; + +/** + * An OAuth 2.0 authorization flow. + */ +interface Flow extends Serializable { + + /** + * Initializes the authorization flow. + * @throws PulsarClientException if the flow could not be initialized. + */ + void initialize() throws PulsarClientException; + + /** + * Acquires an access token from the OAuth 2.0 authorization server. + * @return a token result including an access token and optionally a refresh token. + * @throws PulsarClientException if authentication failed. + */ + TokenResult authenticate() throws PulsarClientException; + + /** + * Closes the authorization flow. + */ + void close(); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java new file mode 100644 index 0000000..0f47121 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.client.api.PulsarClientException; + +/** + * An abstract OAuth 2.0 authorization flow. + */ +@Slf4j +abstract class FlowBase implements Flow { + + private static final long serialVersionUID = 1L; + + protected final URL issuerUrl; + + protected transient Metadata metadata; + + protected FlowBase(URL issuerUrl) { + this.issuerUrl = issuerUrl; + } + + public void initialize() throws PulsarClientException { + try { + this.metadata = createMetadataResolver().resolve(); + } catch (IOException e) { + log.error("Unable to retrieve OAuth 2.0 server metadata", e); + throw new PulsarClientException.AuthenticationException("Unable to retrieve OAuth 2.0 server metadata"); + } + } + + protected MetadataResolver createMetadataResolver() { + return DefaultMetadataResolver.fromIssuerUrl(issuerUrl); + } + + static String parseParameterString(Map<String, String> params, String name) { + String s = params.get(name); + if (StringUtils.isEmpty(s)) { + throw new IllegalArgumentException("Required configuration parameter: " + name); + } + return s; + } + + static URL parseParameterUrl(Map<String, String> params, String name) { + String s = params.get(name); + if (StringUtils.isEmpty(s)) { + throw new IllegalArgumentException("Required configuration parameter: " + name); + } + try { + return new URL(s); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Malformed configuration parameter: " + name); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java new file mode 100644 index 0000000..b4a6510 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.Reader; +import lombok.Data; +import lombok.NoArgsConstructor; + + +/** + * A JSON object representing a credentials file. + */ +@Data +@NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class KeyFile { + + private static ObjectMapper objectMapper = new ObjectMapper(); + + @JsonProperty("type") + private String type; + + @JsonProperty("client_id") + private String clientId; + + @JsonProperty("client_secret") + private String clientSecret; + + @JsonProperty("client_email") + private String clientEmail; + + @JsonProperty("issuer_url") + private String issuerUrl; + + public String toJson() throws IOException { + return objectMapper.writeValueAsString(this); + } + + public static KeyFile fromJson(String value) throws IOException { + return objectMapper.readValue(value, KeyFile.class); + } + + public static KeyFile fromJson(Reader value) throws IOException { + return objectMapper.readValue(value, KeyFile.class); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md new file mode 100644 index 0000000..cca7973 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md @@ -0,0 +1,94 @@ +# Pulsar Client Authentication Plugin for OAuth 2.0 + +Pulsar supports authenticating clients using OAuth 2.0 access tokens. + +You can use tokens to identify a Pulsar client and associate with some "principal" (or "role") that is permitted +to do some actions (eg: publish to a topic or consume from a topic). + +This module is to support Pulsar Client Authentication Plugin for OAuth 2.0. And after communicate with Oauth 2.0 server, +client will get an `access token` from Oauth 2.0 server, and will pass this `access token` to Pulsar broker to do the authentication. +So the Broker side could still use `org.apache.pulsar.broker.authentication.AuthenticationProviderToken`, +also user can add their own `AuthenticationProvider` to work with this module. + +## Provider Configuration +This library allows you to authenticate using an access token that is obtained from an OAuth 2.0 authorization service, +which acts as a _token issuer_. + +### Authentication Types +The authentication type determines how to obtain an access token via an OAuth 2.0 authorization flow. + +#### Client Credentials +The following parameters are supported: + +| Parameter | Description | Example | +|---|---|---| +| `type` | Oauth 2.0 auth type. Optional. | default: `client_credentials` | +| `issuerUrl` | URL of the provider which allows Pulsar to obtain an access token. Required. | `https://accounts.google.com` | +| `privateKey` | URL to a JSON credentials file (in JSON format; see below). Required. | See "Supported Pattern Formats" | +| `audience` | An OAuth 2.0 "resource server" identifier for the Pulsar cluster. Required. | `https://broker.example.com` | + +### Supported Pattern Formats of `privateKey` +The `privateKey` parameter supports the following three pattern formats, and contains client Credentials: + +- `file:///path/to/file` +- `file:/path/to/file` +- `data:application/json;base64,<base64-encoded value>` + +The credentials file contains service account credentials for use with the Client Credentials authentication type. + +For example of a credentials file `credentials_file.json`: +```json +{ + "type": "client_credentials", + "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3", + "client_secret": "on1uJ...k6F6R", + "client_email": "1234567890-abcdefghijklmnopqrstuvw...@developer.gserviceaccount.com", + "issuer_url": "https://accounts.google.com" +} +``` + +The default type is `client_credentials`, and for this type, fields "client_id" and "client_secret" is required. + +### Example for a typical original Oauth2 request mapping + +A typical original Oauth2 request, which used to get access token from Oauth2 server, is like this: + +```bash +curl --request POST \ + --url https://dev-kt-aa9ne.us.auth0.com/oauth/token \ + --header 'content-type: application/json' \ + --data '{ + "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x", + "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb", + "audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/", + "grant_type":"client_credentials"}' +``` + +In which, +- `issuerUrl` parameter in this plugin is mapped to `--url https://dev-kt-aa9ne.us.auth0.com/oauth/token` +- `privateKey` file parameter in this plugin should at least contains fields `client_id` and `client_secret`. +- `audience` parameter in this plugin is mapped to `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"` + +## Pulsar Client Config +You can use the provider with the following Pulsar clients. + +### Java +You can use the factory method: +```java +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://broker.example.com:6650/") + .authentication( + AuthenticationFactoryOAuth2.clientCredentials(this.issuerUrl, this.credentialsUrl, this.audience)) + .build(); +``` + +Similarly, you can use encoded parameters: +```java +Authentication auth = AuthenticationFactory + .create(AuthenticationOAuth2.class.getName(), "{"type":"client_credentials","privateKey":"...","issuerUrl":"...","audience":"..."}"); + +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://broker.example.com:6650/") + .authentication(auth) + .build(); +``` diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java new file mode 100644 index 0000000..3beeda0 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java new file mode 100644 index 0000000..7c14296 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Builder; +import lombok.Data; + +/** + * A token request based on the exchange of client credentials. + * + * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0 RFC 6749, section 4.4</a> + */ +@Data +@Builder +public class ClientCredentialsExchangeRequest { + + @JsonProperty("client_id") + private String clientId; + + @JsonProperty("client_secret") + private String clientSecret; + + @JsonProperty("audience") + private String audience; +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java new file mode 100644 index 0000000..e6a956a --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import java.io.IOException; + +/** + * An interface for exchanging client credentials for an access token. + */ +public interface ClientCredentialsExchanger { + /** + * Requests an exchange of client credentials for an access token. + * @param req the request details. + * @return an access token. + * @throws TokenExchangeException if the OAuth server returned a detailed error. + * @throws IOException if a general IO error occurred. + */ + TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req) + throws TokenExchangeException, IOException; + + /** + * Closes the exchanger. + */ + void close(); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java new file mode 100644 index 0000000..d16ce8b --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import java.time.Duration; + +/** + * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414. + */ +public class DefaultMetadataResolver implements MetadataResolver { + + private final URL metadataUrl; + private final ObjectReader objectReader; + private Duration connectTimeout; + private Duration readTimeout; + + public DefaultMetadataResolver(URL metadataUrl) { + this.metadataUrl = metadataUrl; + this.objectReader = new ObjectMapper().readerFor(Metadata.class); + } + + public DefaultMetadataResolver withConnectTimeout(Duration connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } + + public DefaultMetadataResolver withReadTimeout(Duration readTimeout) { + this.readTimeout = readTimeout; + return this; + } + + /** + * Resolves the authorization metadata. + * @return metadata + * @throws IOException if the metadata could not be resolved. + */ + public Metadata resolve() throws IOException { + try { + URLConnection c = this.metadataUrl.openConnection(); + if (connectTimeout != null) { + c.setConnectTimeout((int) connectTimeout.toMillis()); + } + if (readTimeout != null) { + c.setReadTimeout((int) readTimeout.toMillis()); + } + c.setRequestProperty("Accept", "application/json"); + + Metadata metadata; + try (InputStream inputStream = c.getInputStream()) { + metadata = this.objectReader.readValue(inputStream); + } + return metadata; + + } catch (IOException e) { + throw new IOException("Cannot obtain authorization metadata from " + metadataUrl.toString(), e); + } + } + + /** + * Gets a well-known metadata URL for the given OAuth issuer URL. + * @param issuerUrl The authorization server's issuer identifier + * @return a resolver + */ + public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl) { + return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl)); + } + + /** + * Gets a well-known metadata URL for the given OAuth issuer URL. + * @see <a href="https://tools.ietf.org/id/draft-ietf-oauth-discovery-08.html#ASConfig"> + * OAuth Discovery: Obtaining Authorization Server Metadata</a> + * @param issuerUrl The authorization server's issuer identifier + * @return a URL + */ + public static URL getWellKnownMetadataUrl(URL issuerUrl) { + try { + return new URL(issuerUrl, "/.well-known/openid-configuration"); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java new file mode 100644 index 0000000..93f65be --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URL; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Represents OAuth 2.0 Server Metadata. + */ +@Data +@NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class Metadata { + @JsonProperty("issuer") + private URL issuer; + + @JsonProperty("authorization_endpoint") + private URL authorizationEndpoint; + + @JsonProperty("token_endpoint") + private URL tokenEndpoint; + + @JsonProperty("userinfo_endpoint") + private URL userInfoEndpoint; + + @JsonProperty("revocation_endpoint") + private URL revocationEndpoint; + + @JsonProperty("jwks_uri") + private URL jwksUri; + + @JsonProperty("device_authorization_endpoint") + private URL deviceAuthorizationEndpoint; +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java new file mode 100644 index 0000000..85a6a0b --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import java.io.IOException; + +/** + * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414. + */ +public interface MetadataResolver { + Metadata resolve() throws IOException; +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java new file mode 100644 index 0000000..715579d --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.http.Consts; +import org.apache.http.HttpEntity; +import org.apache.http.NameValuePair; +import org.apache.http.StatusLine; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpResponseException; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; + +/** + * A client for an OAuth 2.0 token endpoint. + */ +public class TokenClient implements AutoCloseable, ClientCredentialsExchanger { + + private static final ObjectReader resultReader; + private static final ObjectReader errorReader; + + static { + resultReader = new ObjectMapper().readerFor(TokenResult.class); + errorReader = new ObjectMapper().readerFor(TokenError.class); + } + + private final URL tokenUrl; + private final CloseableHttpClient httpclient; + + public TokenClient(URL tokenUrl) { + this.tokenUrl = tokenUrl; + this.httpclient = HttpClientBuilder.create().useSystemProperties().disableCookieManagement().build(); + } + + public void close() { + } + + /** + * Performs a token exchange using client credentials. + * @param req the client credentials request details. + * @return a token result + * @throws TokenExchangeException + */ + public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req) + throws TokenExchangeException, IOException { + List<NameValuePair> params = new ArrayList<>(4); + params.add(new BasicNameValuePair("grant_type", "client_credentials")); + params.add(new BasicNameValuePair("client_id", req.getClientId())); + params.add(new BasicNameValuePair("client_secret", req.getClientSecret())); + params.add(new BasicNameValuePair("audience", req.getAudience())); + HttpPost post = new HttpPost(tokenUrl.toString()); + post.setHeader("Accept", ContentType.APPLICATION_JSON.getMimeType()); + post.setEntity(new UrlEncodedFormEntity(params, Consts.UTF_8)); + + try (CloseableHttpResponse response = httpclient.execute(post)) { + StatusLine status = response.getStatusLine(); + HttpEntity entity = response.getEntity(); + try { + switch(status.getStatusCode()) { + case HttpURLConnection.HTTP_OK: + return readResponse(entity, resultReader); + case HttpURLConnection.HTTP_BAD_REQUEST: + case HttpURLConnection.HTTP_UNAUTHORIZED: + throw new TokenExchangeException(readResponse(entity, errorReader)); + default: + throw new HttpResponseException(status.getStatusCode(), status.getReasonPhrase()); + } + } finally { + EntityUtils.consume(entity); + } + } + } + + private static <T> T readResponse(HttpEntity entity, ObjectReader objectReader) throws IOException { + ContentType contentType = ContentType.getOrDefault(entity); + if (!ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) { + throw new ClientProtocolException("Unsupported content type: " + contentType.getMimeType()); + } + Charset charset = contentType.getCharset(); + if (charset == null) { + charset = StandardCharsets.UTF_8; + } + try (Reader reader = new InputStreamReader(entity.getContent(), charset)) { + @SuppressWarnings("unchecked") T obj = (T) objectReader.readValue(reader); + return obj; + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java new file mode 100644 index 0000000..5f050a1 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Represents an error returned from an OAuth 2.0 token endpoint. + */ +@Data +@NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class TokenError { + @JsonProperty("error") + private String error; + + @JsonProperty("error_description") + private String errorDescription; + + @JsonProperty("error_uri") + private String errorUri; +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java new file mode 100644 index 0000000..286051f --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +/** + * Indicates a token exchange failure. + */ +public class TokenExchangeException extends Exception { + private TokenError error; + + public TokenExchangeException(TokenError error) { + super(String.format("%s (%s)", error.getErrorDescription(), error.getError())); + this.error = error; + } + + public TokenError getError() { + return error; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java new file mode 100644 index 0000000..8b333c0 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Serializable; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * The result of a token exchange request. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class TokenResult implements Serializable { + private static final long serialVersionUID = 1L; + + @JsonProperty("access_token") + private String accessToken; + + @JsonProperty("id_token") + private String idToken; + + @JsonProperty("refresh_token") + private String refreshToken; + + @JsonProperty("expires_in") + private int expiresIn; +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java new file mode 100644 index 0000000..2068111 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java new file mode 100644 index 0000000..f45c2c0 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Tests {@link AuthenticationOAuth2}. + */ +public class AuthenticationOAuth2Test { + private static final String TEST_ACCESS_TOKEN = "x.y.z"; + private static final int TEST_EXPIRES_IN = 60; + + private MockClock clock; + private Flow flow; + private AuthenticationOAuth2 auth; + + @BeforeMethod + public void before() { + this.clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC); + this.flow = mock(Flow.class); + this.auth = new AuthenticationOAuth2(flow, this.clock); + } + + @Test + public void testGetAuthMethodName() { + assertEquals(this.auth.getAuthMethodName(), "token"); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*No.*") + public void testConfigureNoParams() throws Exception { + this.auth.configure(""); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Malformed.*") + public void testConfigureMalformed() throws Exception { + this.auth.configure("{garbage}"); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Required.*") + public void testConfigureRequired() throws Exception { + this.auth.configure("{}"); + } + + @Test + public void testConfigure() throws Exception { + Map<String, String> params = new HashMap<>(); + params.put("type", "client_credentials"); + params.put("privateKey", "data:base64,e30="); + params.put("issuerUrl", "http://localhost"); + params.put("audience", "http://localhost"); + ObjectMapper mapper = new ObjectMapper(); + String authParams = mapper.writeValueAsString(params); + this.auth.configure(authParams); + assertNotNull(this.auth.flow); + } + + @Test + public void testStart() throws Exception { + this.auth.start(); + verify(this.flow).initialize(); + } + + @Test + public void testGetAuthData() throws Exception { + AuthenticationDataProvider data; + TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build(); + doReturn(tr).when(this.flow).authenticate(); + data = this.auth.getAuthData(); + verify(this.flow, times(1)).authenticate(); + assertEquals(data.getCommandData(), tr.getAccessToken()); + + // cache hit + data = this.auth.getAuthData(); + verify(this.flow, times(1)).authenticate(); + assertEquals(data.getCommandData(), tr.getAccessToken()); + + // cache miss + clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN)); + data = this.auth.getAuthData(); + verify(this.flow, times(2)).authenticate(); + assertEquals(data.getCommandData(), tr.getAccessToken()); + } + + @Test + public void testClose() throws Exception { + this.auth.close(); + verify(this.flow).close(); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java new file mode 100644 index 0000000..1e23311 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import java.io.Serializable; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; + +/** + * A clock where the current instant is manually adjustable. + */ +public class MockClock extends Clock implements Serializable { + private static final long serialVersionUID = 1L; + private Instant instant; + private final ZoneId zone; + + public MockClock(Instant fixedInstant, ZoneId zone) { + this.instant = fixedInstant; + this.zone = zone; + } + + @Override + public ZoneId getZone() { + return zone; + } + + @Override + public Clock withZone(ZoneId zone) { + if (zone.equals(this.zone)) { + return this; + } + return new MockClock(instant, zone); + } + + @Override + public long millis() { + return instant.toEpochMilli(); + } + + @Override + public Instant instant() { + return instant; + } + + /** + * Sets the clock to the given instant. + * @param fixedInstant the instant + */ + public void setInstant(Instant fixedInstant) { + this.instant = fixedInstant; + } + + /** + * Advances the clock by the given duration. + * @param duration the duration + */ + public void advance(Duration duration) { + this.instant = this.instant.plus(duration); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof MockClock) { + MockClock other = (MockClock) obj; + return instant.equals(other.instant) && zone.equals(other.zone); + } + return false; + } + + @Override + public int hashCode() { + return instant.hashCode() ^ zone.hashCode(); + } + + @Override + public String toString() { + return "MockClock[" + instant + "," + zone + "]"; + } +}