This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch pulsar-authentication
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3e2c33cca34e28df4e7b99bcb437e7fdc544ed1d
Author: Jean Helou <[email protected]>
AuthorDate: Thu Nov 24 23:47:46 2022 +0100

    [JAMES-3687] refactors client creation to PulsarConfiguration
---
 .../backends/pulsar/PulsarConfiguration.scala      | 30 ++++++++++-
 .../james/queue/pulsar/PulsarMailQueue.scala       | 59 ++++++++++------------
 .../queue/pulsar/PulsarMailQueueFactory.scala      |  7 ++-
 3 files changed, 60 insertions(+), 36 deletions(-)

diff --git 
a/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala
 
b/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala
index 7e790f29c6..e50842821b 100644
--- 
a/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala
+++ 
b/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala
@@ -22,6 +22,11 @@ package org.apache.james.backends.pulsar
 import java.net.{URI, URISyntaxException}
 import org.apache.commons.configuration2.Configuration
 import com.google.common.base.Strings
+import com.sksamuel.pulsar4s.{PulsarAsyncClient, PulsarClient, 
PulsarClientConfig}
+import org.apache.pulsar.client.admin.PulsarAdmin
+import org.apache.pulsar.client.impl.auth.{AuthenticationBasic, 
AuthenticationDisabled, AuthenticationToken}
+
+import scala.jdk.CollectionConverters.MapHasAsJava
 
 object PulsarConfiguration {
   val BROKER_URI_PROPERTY_NAME = "broker.uri"
@@ -100,4 +105,27 @@ object Auth {
   case class Basic(userId: String, password: String) extends Auth
 }
 
-case class PulsarConfiguration(brokerUri: String, adminUri: String, namespace: 
Namespace, auth: Auth = Auth.NoAuth)
\ No newline at end of file
+case class PulsarConfiguration(brokerUri: String, adminUri: String, namespace: 
Namespace, auth: Auth = Auth.NoAuth) {
+  private val pulsarAuth = auth match {
+    case Auth.NoAuth => new AuthenticationDisabled()
+    case Auth.Token(value) => new AuthenticationToken(value)
+    case Auth.Basic(userId, password) =>
+      val basic = new AuthenticationBasic()
+      basic.configure(Map("userId" -> userId, "password" -> password).asJava)
+      basic
+  }
+
+  def adminClient(): PulsarAdmin =
+    PulsarAdmin.builder()
+      .serviceHttpUrl(adminUri)
+      .authentication(pulsarAuth)
+      .build()
+
+  def asyncClient(): PulsarAsyncClient =
+    PulsarClient(
+      PulsarClientConfig(
+        serviceUrl = brokerUri,
+        authentication = Some(pulsarAuth)
+      )
+    )
+}
\ No newline at end of file
diff --git 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index a6b352cde8..1b6e6f9b16 100644
--- 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -1,4 +1,4 @@
-/****************************************************************
+/** **************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one   *
  * or more contributor license agreements.  See the NOTICE file *
  * distributed with this work for additional information        *
@@ -6,23 +6,19 @@
  * to you under the Apache License, Version 2.0 (the            *
  * "License"); you may not use this file except in compliance   *
  * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                 *
+ * *
  * Unless required by applicable law or agreed to in writing,   *
  * software distributed under the License is distributed on an  *
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
  * KIND, either express or implied.  See the License for the    *
  * specific language governing permissions and limitations      *
  * under the License.                                           *
- ****************************************************************/
+ * ************************************************************** */
 
 package org.apache.james.queue.pulsar
 
-import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
-import java.util.concurrent.TimeUnit
-import java.util.{Date, UUID}
-
 import akka.actor.{ActorRef, ActorSystem}
 import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source, 
SourceQueueWithComplete, StreamConverters}
 import akka.stream.{Attributes, OverflowStrategy}
@@ -31,8 +27,6 @@ import akka.{Done, NotUsed}
 import com.sksamuel.pulsar4s._
 import com.sksamuel.pulsar4s.akka.streams
 import com.sksamuel.pulsar4s.akka.streams.{CommittableMessage, Control}
-import javax.mail.MessagingException
-import javax.mail.internet.MimeMessage
 import org.apache.james.backends.pulsar.PulsarReader
 import org.apache.james.blob.api.{BlobId, ObjectNotFoundException, Store}
 import org.apache.james.blob.mail.MimeMessagePartsId
@@ -44,13 +38,17 @@ import org.apache.james.queue.api._
 import org.apache.james.queue.pulsar.EnqueueId.EnqueueId
 import org.apache.james.server.core.MailImpl
 import org.apache.mailet._
-import org.apache.pulsar.client.admin.PulsarAdmin
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
 import org.apache.pulsar.client.api.{Schema, SubscriptionInitialPosition, 
SubscriptionType}
 import org.reactivestreams.Publisher
 import org.slf4j.LoggerFactory
 import play.api.libs.json._
 
+import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
+import java.util.concurrent.TimeUnit
+import java.util.{Date, UUID}
+import javax.mail.MessagingException
+import javax.mail.internet.MimeMessage
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
@@ -85,14 +83,14 @@ private[pulsar] object schemas {
  * A filter cannot remove messages that are enqueued after the call to the 
`remove` method.
  */
 class PulsarMailQueue(
-  config: PulsarMailQueueConfiguration,
-  blobIdFactory: BlobId.Factory,
-  mimeMessageStore: Store[MimeMessage, MimeMessagePartsId],
-  mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory,
-  metricFactory: MetricFactory,
-  gaugeRegistry: GaugeRegistry,
-  system: ActorSystem
-) extends MailQueue with ManageableMailQueue {
+                       config: PulsarMailQueueConfiguration,
+                       blobIdFactory: BlobId.Factory,
+                       mimeMessageStore: Store[MimeMessage, 
MimeMessagePartsId],
+                       mailQueueItemDecoratorFactory: 
MailQueueItemDecoratorFactory,
+                       metricFactory: MetricFactory,
+                       gaugeRegistry: GaugeRegistry,
+                       system: ActorSystem
+                     ) extends MailQueue with ManageableMailQueue {
 
   import schemas._
   import serializers._
@@ -110,11 +108,8 @@ class PulsarMailQueue(
   private implicit val implicitSystem: ActorSystem = system
   private implicit val ec: ExecutionContextExecutor = system.dispatcher
   private implicit val implicitBlobIdFactory: BlobId.Factory = blobIdFactory
-  private implicit val client: PulsarAsyncClient = 
PulsarClient(config.pulsar.brokerUri)
-  private val admin = {
-    val builder = PulsarAdmin.builder()
-    builder.serviceHttpUrl(config.pulsar.adminUri).build()
-  }
+  private implicit val client: PulsarAsyncClient = config.pulsar.asyncClient()
+  private val admin = config.pulsar.adminClient()
 
   private val outTopic = 
Topic(s"persistent://${config.pulsar.namespace.asString}/James-${config.name.asString()}")
   private val scheduledTopic = 
Topic(s"persistent://${config.pulsar.namespace.asString}/${config.name.asString()}-scheduled")
@@ -212,7 +207,7 @@ class PulsarMailQueue(
   private val filterScheduledStage: ActorRef = 
system.actorOf(FilterStage.props)
   private val requeueMessage = Flow.apply[CommittableMessage[String]]
     .via(filteringFlow(filterScheduledStage))
-    .flatMapConcat{case (_,_,message) => 
Source.future(requeue.offer(ProducerMessage(message.message.value)).map(_ => 
message))}
+    .flatMapConcat { case (_, _, message) => 
Source.future(requeue.offer(ProducerMessage(message.message.value)).map(_ => 
message)) }
     .flatMapConcat(message => Source.future(message.ack(cumulative = false)))
     .toMat(Sink.ignore)(Keep.none)
 
@@ -244,7 +239,7 @@ class PulsarMailQueue(
       
.toMat(Sink.asPublisher[MailQueue.MailQueueItem](true).withAttributes(Attributes.inputBuffer(initial
 = 1, max = 1)))(Keep.both)
   }
 
-  private def filteringFlow(filterActor:ActorRef) = {
+  private def filteringFlow(filterActor: ActorRef) = {
     implicit val timeout: Timeout = Timeout(1, TimeUnit.SECONDS)
     Flow.apply[CommittableMessage[String]].map(message =>
       (Json.fromJson[MailMetadata](Json.parse(message.message.value)).get,
@@ -261,7 +256,7 @@ class PulsarMailQueue(
           val partsId = metadata.partsId
           Source
             .fromPublisher(readMimeMessage(partsId))
-            .collect{ case Some(message) => message }
+            .collect { case Some(message) => message }
             .map(message => (readMail(metadata, message), partsId, 
committableMessage))
       }
   }
@@ -325,7 +320,7 @@ class PulsarMailQueue(
    *
    * @see [[FilterStage]]
    */
-  private def filtersCommandFlow(topic:Topic, filterSubscription: 
Subscription, filteringStage: ActorRef) = {
+  private def filtersCommandFlow(topic: Topic, filterSubscription: 
Subscription, filteringStage: ActorRef) = {
     val logInvalidFilterPayload = Flow.apply[JsResult[Filter]]
       .collectType[JsError]
       .map(error => "unable to parse filter" + 
Json.prettyPrint(JsError.toJson(error)))
@@ -543,11 +538,12 @@ class PulsarMailQueue(
    * This is reliant on the FilterStage implementation being able to 
deduplicate
    * filters. The current implementation defined filters as value objects and 
stores
    * them in a Set which will effectively dedpulicate them.
+   *
    * @see org.apache.james.queue.pulsar.FilterStage.filters
    * @param producer
    * @param filter
    */
-  private def publishFilter(producer:Producer[String])(filter:Filter): Unit ={
+  private def publishFilter(producer: Producer[String])(filter: Filter): Unit 
= {
     import Filter._
     // Optimizes for the local/single instance case, the duplicated filter
     // received through pulsar will be eliminated by the filter stage as
@@ -586,13 +582,14 @@ class PulsarMailQueue(
           .bodyBlobId(blobIdFactory.from(metadata.bodyBlobId))
           .build()
         Source.fromPublisher(readMimeMessage(partsId))
-          .collect{ case Some(message) => message }
+          .collect { case Some(message) => message }
           .map(message => readMail(metadata, message))
       })
 
     new ManageableMailQueue.MailQueueIterator() {
       private val javaStream = 
browseableMails.runWith(StreamConverters.asJavaStream[Mail]())
       private val iterator = javaStream.iterator()
+
       /**
        * @inheritdoc
        */
diff --git 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
index fda7581ac2..536f8575cb 100644
--- 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
+++ 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
@@ -20,12 +20,13 @@
 package org.apache.james.queue.pulsar
 
 import akka.actor.ActorSystem
-import org.apache.james.backends.pulsar.PulsarConfiguration
+import org.apache.james.backends.pulsar.{Auth, PulsarConfiguration}
 import org.apache.james.blob.api.{BlobId, Store}
 import org.apache.james.blob.mail.MimeMessagePartsId
 import org.apache.james.metrics.api.{GaugeRegistry, MetricFactory}
 import org.apache.james.queue.api.{MailQueueFactory, 
MailQueueItemDecoratorFactory, MailQueueName}
 import org.apache.pulsar.client.admin.PulsarAdmin
+import org.apache.pulsar.client.impl.auth.{AuthenticationBasic, 
AuthenticationToken}
 
 import java.util
 import java.util.Optional
@@ -45,9 +46,7 @@ class PulsarMailQueueFactory @Inject()(pulsarConfiguration: 
PulsarConfiguration,
   gaugeRegistry: GaugeRegistry
 ) extends MailQueueFactory[PulsarMailQueue] {
   private val queues: AtomicReference[Map[MailQueueName, PulsarMailQueue]] = 
new AtomicReference(Map.empty)
-  private val admin =
-    PulsarAdmin.builder().serviceHttpUrl(pulsarConfiguration.adminUri).build()
-
+  private val admin = pulsarConfiguration.adminClient()
   private val system: ActorSystem = ActorSystem("pulsar-mailqueue")
 
   @PreDestroy


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to