This is an automated email from the ASF dual-hosted git repository. jhelou pushed a commit to branch pulsar-authentication in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3e2c33cca34e28df4e7b99bcb437e7fdc544ed1d Author: Jean Helou <[email protected]> AuthorDate: Thu Nov 24 23:47:46 2022 +0100 [JAMES-3687] refactors client creation to PulsarConfiguration --- .../backends/pulsar/PulsarConfiguration.scala | 30 ++++++++++- .../james/queue/pulsar/PulsarMailQueue.scala | 59 ++++++++++------------ .../queue/pulsar/PulsarMailQueueFactory.scala | 7 ++- 3 files changed, 60 insertions(+), 36 deletions(-) diff --git a/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala b/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala index 7e790f29c6..e50842821b 100644 --- a/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala +++ b/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala @@ -22,6 +22,11 @@ package org.apache.james.backends.pulsar import java.net.{URI, URISyntaxException} import org.apache.commons.configuration2.Configuration import com.google.common.base.Strings +import com.sksamuel.pulsar4s.{PulsarAsyncClient, PulsarClient, PulsarClientConfig} +import org.apache.pulsar.client.admin.PulsarAdmin +import org.apache.pulsar.client.impl.auth.{AuthenticationBasic, AuthenticationDisabled, AuthenticationToken} + +import scala.jdk.CollectionConverters.MapHasAsJava object PulsarConfiguration { val BROKER_URI_PROPERTY_NAME = "broker.uri" @@ -100,4 +105,27 @@ object Auth { case class Basic(userId: String, password: String) extends Auth } -case class PulsarConfiguration(brokerUri: String, adminUri: String, namespace: Namespace, auth: Auth = Auth.NoAuth) \ No newline at end of file +case class PulsarConfiguration(brokerUri: String, adminUri: String, namespace: Namespace, auth: Auth = Auth.NoAuth) { + private val pulsarAuth = auth match { + case Auth.NoAuth => new AuthenticationDisabled() + case Auth.Token(value) => new AuthenticationToken(value) + case Auth.Basic(userId, password) => + val basic = new AuthenticationBasic() + basic.configure(Map("userId" -> userId, "password" -> password).asJava) + basic + } + + def adminClient(): PulsarAdmin = + PulsarAdmin.builder() + .serviceHttpUrl(adminUri) + .authentication(pulsarAuth) + .build() + + def asyncClient(): PulsarAsyncClient = + PulsarClient( + PulsarClientConfig( + serviceUrl = brokerUri, + authentication = Some(pulsarAuth) + ) + ) +} \ No newline at end of file diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala index a6b352cde8..1b6e6f9b16 100644 --- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala +++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala @@ -1,4 +1,4 @@ -/**************************************************************** +/** ************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -6,23 +6,19 @@ * to you under the Apache License, Version 2.0 (the * * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * * KIND, either express or implied. See the License for the * * specific language governing permissions and limitations * * under the License. * - ****************************************************************/ + * ************************************************************** */ package org.apache.james.queue.pulsar -import java.time.{Instant, ZonedDateTime, Duration => JavaDuration} -import java.util.concurrent.TimeUnit -import java.util.{Date, UUID} - import akka.actor.{ActorRef, ActorSystem} import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source, SourceQueueWithComplete, StreamConverters} import akka.stream.{Attributes, OverflowStrategy} @@ -31,8 +27,6 @@ import akka.{Done, NotUsed} import com.sksamuel.pulsar4s._ import com.sksamuel.pulsar4s.akka.streams import com.sksamuel.pulsar4s.akka.streams.{CommittableMessage, Control} -import javax.mail.MessagingException -import javax.mail.internet.MimeMessage import org.apache.james.backends.pulsar.PulsarReader import org.apache.james.blob.api.{BlobId, ObjectNotFoundException, Store} import org.apache.james.blob.mail.MimeMessagePartsId @@ -44,13 +38,17 @@ import org.apache.james.queue.api._ import org.apache.james.queue.pulsar.EnqueueId.EnqueueId import org.apache.james.server.core.MailImpl import org.apache.mailet._ -import org.apache.pulsar.client.admin.PulsarAdmin import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException import org.apache.pulsar.client.api.{Schema, SubscriptionInitialPosition, SubscriptionType} import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import play.api.libs.json._ +import java.time.{Instant, ZonedDateTime, Duration => JavaDuration} +import java.util.concurrent.TimeUnit +import java.util.{Date, UUID} +import javax.mail.MessagingException +import javax.mail.internet.MimeMessage import scala.concurrent._ import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ @@ -85,14 +83,14 @@ private[pulsar] object schemas { * A filter cannot remove messages that are enqueued after the call to the `remove` method. */ class PulsarMailQueue( - config: PulsarMailQueueConfiguration, - blobIdFactory: BlobId.Factory, - mimeMessageStore: Store[MimeMessage, MimeMessagePartsId], - mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory, - metricFactory: MetricFactory, - gaugeRegistry: GaugeRegistry, - system: ActorSystem -) extends MailQueue with ManageableMailQueue { + config: PulsarMailQueueConfiguration, + blobIdFactory: BlobId.Factory, + mimeMessageStore: Store[MimeMessage, MimeMessagePartsId], + mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory, + metricFactory: MetricFactory, + gaugeRegistry: GaugeRegistry, + system: ActorSystem + ) extends MailQueue with ManageableMailQueue { import schemas._ import serializers._ @@ -110,11 +108,8 @@ class PulsarMailQueue( private implicit val implicitSystem: ActorSystem = system private implicit val ec: ExecutionContextExecutor = system.dispatcher private implicit val implicitBlobIdFactory: BlobId.Factory = blobIdFactory - private implicit val client: PulsarAsyncClient = PulsarClient(config.pulsar.brokerUri) - private val admin = { - val builder = PulsarAdmin.builder() - builder.serviceHttpUrl(config.pulsar.adminUri).build() - } + private implicit val client: PulsarAsyncClient = config.pulsar.asyncClient() + private val admin = config.pulsar.adminClient() private val outTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/James-${config.name.asString()}") private val scheduledTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/${config.name.asString()}-scheduled") @@ -212,7 +207,7 @@ class PulsarMailQueue( private val filterScheduledStage: ActorRef = system.actorOf(FilterStage.props) private val requeueMessage = Flow.apply[CommittableMessage[String]] .via(filteringFlow(filterScheduledStage)) - .flatMapConcat{case (_,_,message) => Source.future(requeue.offer(ProducerMessage(message.message.value)).map(_ => message))} + .flatMapConcat { case (_, _, message) => Source.future(requeue.offer(ProducerMessage(message.message.value)).map(_ => message)) } .flatMapConcat(message => Source.future(message.ack(cumulative = false))) .toMat(Sink.ignore)(Keep.none) @@ -244,7 +239,7 @@ class PulsarMailQueue( .toMat(Sink.asPublisher[MailQueue.MailQueueItem](true).withAttributes(Attributes.inputBuffer(initial = 1, max = 1)))(Keep.both) } - private def filteringFlow(filterActor:ActorRef) = { + private def filteringFlow(filterActor: ActorRef) = { implicit val timeout: Timeout = Timeout(1, TimeUnit.SECONDS) Flow.apply[CommittableMessage[String]].map(message => (Json.fromJson[MailMetadata](Json.parse(message.message.value)).get, @@ -261,7 +256,7 @@ class PulsarMailQueue( val partsId = metadata.partsId Source .fromPublisher(readMimeMessage(partsId)) - .collect{ case Some(message) => message } + .collect { case Some(message) => message } .map(message => (readMail(metadata, message), partsId, committableMessage)) } } @@ -325,7 +320,7 @@ class PulsarMailQueue( * * @see [[FilterStage]] */ - private def filtersCommandFlow(topic:Topic, filterSubscription: Subscription, filteringStage: ActorRef) = { + private def filtersCommandFlow(topic: Topic, filterSubscription: Subscription, filteringStage: ActorRef) = { val logInvalidFilterPayload = Flow.apply[JsResult[Filter]] .collectType[JsError] .map(error => "unable to parse filter" + Json.prettyPrint(JsError.toJson(error))) @@ -543,11 +538,12 @@ class PulsarMailQueue( * This is reliant on the FilterStage implementation being able to deduplicate * filters. The current implementation defined filters as value objects and stores * them in a Set which will effectively dedpulicate them. + * * @see org.apache.james.queue.pulsar.FilterStage.filters * @param producer * @param filter */ - private def publishFilter(producer:Producer[String])(filter:Filter): Unit ={ + private def publishFilter(producer: Producer[String])(filter: Filter): Unit = { import Filter._ // Optimizes for the local/single instance case, the duplicated filter // received through pulsar will be eliminated by the filter stage as @@ -586,13 +582,14 @@ class PulsarMailQueue( .bodyBlobId(blobIdFactory.from(metadata.bodyBlobId)) .build() Source.fromPublisher(readMimeMessage(partsId)) - .collect{ case Some(message) => message } + .collect { case Some(message) => message } .map(message => readMail(metadata, message)) }) new ManageableMailQueue.MailQueueIterator() { private val javaStream = browseableMails.runWith(StreamConverters.asJavaStream[Mail]()) private val iterator = javaStream.iterator() + /** * @inheritdoc */ diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala index fda7581ac2..536f8575cb 100644 --- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala +++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala @@ -20,12 +20,13 @@ package org.apache.james.queue.pulsar import akka.actor.ActorSystem -import org.apache.james.backends.pulsar.PulsarConfiguration +import org.apache.james.backends.pulsar.{Auth, PulsarConfiguration} import org.apache.james.blob.api.{BlobId, Store} import org.apache.james.blob.mail.MimeMessagePartsId import org.apache.james.metrics.api.{GaugeRegistry, MetricFactory} import org.apache.james.queue.api.{MailQueueFactory, MailQueueItemDecoratorFactory, MailQueueName} import org.apache.pulsar.client.admin.PulsarAdmin +import org.apache.pulsar.client.impl.auth.{AuthenticationBasic, AuthenticationToken} import java.util import java.util.Optional @@ -45,9 +46,7 @@ class PulsarMailQueueFactory @Inject()(pulsarConfiguration: PulsarConfiguration, gaugeRegistry: GaugeRegistry ) extends MailQueueFactory[PulsarMailQueue] { private val queues: AtomicReference[Map[MailQueueName, PulsarMailQueue]] = new AtomicReference(Map.empty) - private val admin = - PulsarAdmin.builder().serviceHttpUrl(pulsarConfiguration.adminUri).build() - + private val admin = pulsarConfiguration.adminClient() private val system: ActorSystem = ActorSystem("pulsar-mailqueue") @PreDestroy --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
