PG1204 commented on code in PR #5124:
URL: https://github.com/apache/texera/pull/5124#discussion_r3270304304


##########
amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import kong.unirest.Unirest
+
+import javax.ws.rs._
+import javax.ws.rs.core.{MediaType, Response}
+import java.nio.file.{Files, Path => NioPath, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+/**
+  * REST resource that proxies the Hugging Face Hub API to list
+  * models for the HuggingFace operator.
+  *
+  * Browse mode:  GET /api/huggingface/models?task=text-generation
+  *   Fetches ALL models for the task from HF Hub (paginated internally),
+  *   caches the full list server-side, and returns it.
+  *
+  * Search mode:  GET /api/huggingface/models?task=text-generation&search=bert
+  *   Forwards the search query to HF Hub API (searches all models).
+  */
+@Path("/huggingface")
+@Produces(Array(MediaType.APPLICATION_JSON))
+class HuggingFaceModelResource {
+
+  import HuggingFaceModelResource._
+
+  @GET
+  @Path("/models")
+  def listModels(
+      @QueryParam("task") @DefaultValue("text-generation") task: String,
+      @QueryParam("search") search: String
+  ): Response = {
+    try {
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+
+      // ── Search mode: forward query to HF Hub, return results directly ──
+      if (search != null && search.trim.nonEmpty) {
+        return fetchSearchResults(task, search.trim, hfToken)
+      }
+
+      // ── Browse mode: return ALL models for this task (cached) ──
+      val cached = modelCache.get(task)
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      // Not cached — fetch all pages from HF Hub API
+      val allModels = fetchAllModelsForTask(task, hfToken)
+      val json = objectMapper.writeValueAsString(allModels)
+      modelCache.put(task, json)
+
+      Response.ok(json).build()
+
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @POST
+  @Path("/upload-audio")
+  @Consumes(Array(MediaType.WILDCARD))
+  def uploadAudioReference(
+      @QueryParam("filename") filename: String,
+      bytes: Array[Byte]
+  ): Response = {
+    try {
+      if (bytes == null || bytes.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Audio payload is empty."}""")
+          .build()
+      }
+
+      val safeFileName = Option(filename)
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(name => Paths.get(name).getFileName.toString)
+        .getOrElse("audio.bin")
+      val extension = {
+        val idx = safeFileName.lastIndexOf('.')
+        if (idx >= 0 && idx < safeFileName.length - 1) 
safeFileName.substring(idx) else ".bin"
+      }
+
+      val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), 
"texera-hf-audio")
+      Files.createDirectories(tempDir)
+      val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", 
extension)
+      Files.write(tempFile, bytes)
+
+      val json = objectMapper.writeValueAsString(
+        Map(
+          "path" -> tempFile.toAbsolutePath.toString,
+          "fileName" -> safeFileName
+        ).asJava
+      )
+      Response.ok(json).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @GET
+  @Path("/audio-preview")
+  def previewUploadedAudio(@QueryParam("path") path: String): Response = {
+    try {
+      val trimmedPath = Option(path).map(_.trim).getOrElse("")
+      if (trimmedPath.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Audio path is required."}""")
+          .build()
+      }
+
+      val tempDir = Paths
+        .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio")
+        .toAbsolutePath
+        .normalize()
+      val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize()
+      if (!requestedPath.startsWith(tempDir)) {
+        return Response
+          .status(Response.Status.FORBIDDEN)
+          .entity("""{"error":"Audio path is outside the allowed preview 
directory."}""")
+          .build()
+      }
+      if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) 
{
+        return Response
+          .status(Response.Status.NOT_FOUND)
+          .entity("""{"error":"Uploaded audio file was not found."}""")
+          .build()
+      }
+
+      val contentType = Option(Files.probeContentType(requestedPath))
+        .filter(_.trim.nonEmpty)
+        .getOrElse(inferAudioContentType(requestedPath))
+      Response.ok(Files.readAllBytes(requestedPath), contentType).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to read uploaded audio: 
${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @GET
+  @Path("/media-proxy")
+  def proxyRemoteMedia(@QueryParam("url") url: String): Response = {
+    try {
+      val trimmedUrl = Option(url).map(_.trim).getOrElse("")
+      if (trimmedUrl.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Media URL is required."}""")
+          .build()
+      }
+      if (!trimmedUrl.startsWith("http://";) && 
!trimmedUrl.startsWith("https://";)) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Only http(s) media URLs are supported."}""")
+          .build()
+      }
+
+      val upstreamResponse = Unirest
+        .get(trimmedUrl)
+        .connectTimeout(10000)
+        .socketTimeout(120000)
+        .asBytes()
+
+      if (upstreamResponse.getStatus != 200) {
+        return Response
+          .status(upstreamResponse.getStatus)
+          .entity(
+            s"""{"error":"Failed to fetch remote media: 
${upstreamResponse.getStatusText}"}"""
+          )
+          .build()
+      }
+
+      val contentType = 
Option(upstreamResponse.getHeaders.getFirst("Content-Type"))
+        .filter(_.trim.nonEmpty)
+        .getOrElse(MediaType.APPLICATION_OCTET_STREAM)
+      Response.ok(upstreamResponse.getBody, contentType).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to proxy remote media: 
${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  /** Search HF Hub for models matching a query within a task. */
+  private def fetchSearchResults(task: String, query: String, hfToken: 
String): Response = {
+    var request = Unirest
+      .get("https://huggingface.co/api/models";)
+      .queryString("pipeline_tag", task)
+      .queryString("sort", "downloads")
+      .queryString("direction", "-1")
+      .queryString("limit", "100")
+      .queryString("filter", task)
+      .queryString("inference", "warm")
+      .queryString("search", query)
+
+    if (hfToken.nonEmpty) {
+      request = request.header("Authorization", s"Bearer $hfToken")
+    }
+
+    val hfResponse = request.asString()
+
+    if (hfResponse.getStatus != 200) {
+      return Response
+        .status(hfResponse.getStatus)
+        .entity(s"""{"error":"Hugging Face API error: 
${hfResponse.getStatusText}"}""")
+        .build()
+    }
+
+    val rawModels = objectMapper.readValue(hfResponse.getBody, listOfMapsType)
+    val out = buildSimplifiedList(rawModels)
+    Response.ok(objectMapper.writeValueAsString(out)).build()
+  }
+
+  /**
+    * Fetch pipeline task tags from the Hugging Face Hub API.
+    * GET /api/huggingface/tasks
+    *
+    * Returns a JSON array of objects: [{ "tag": "text-generation", "label": 
"Text Generation" }, ...]
+    * The result is cached server-side for the lifetime of the process.
+    */
+  @GET
+  @Path("/tasks")
+  def listTasks(): Response = {
+    try {
+      val cached = taskCache.get("all")
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+      var request = Unirest
+        .get("https://huggingface.co/api/tasks";)
+        .connectTimeout(10000)
+        .socketTimeout(15000)
+
+      if (hfToken.nonEmpty) {
+        request = request.header("Authorization", s"Bearer $hfToken")
+      }
+
+      val hfResponse = request.asString()
+
+      if (hfResponse.getStatus != 200) {
+        return Response
+          .status(hfResponse.getStatus)
+          .entity(s"""{"error":"Hugging Face API error: 
${hfResponse.getStatusText}"}""")
+          .build()
+      }
+
+      // /api/tasks returns a JSON object: { "<pipeline_tag>": { "label": 
"...", ... }, ... }
+      // Using readTree so no entry is dropped regardless of its value type 
(null, array, etc.)
+      val root: JsonNode = objectMapper.readTree(hfResponse.getBody)
+      val taskList = new java.util.ArrayList[java.util.Map[String, Object]]()
+      val iter = root.fields()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        val tag = entry.getKey
+        val info: JsonNode = entry.getValue
+        val label =
+          if (info != null && info.isObject && info.has("label")) 
info.get("label").asText(tag)
+          else tag
+        val taskEntry = new java.util.LinkedHashMap[String, Object]()
+        taskEntry.put("tag", tag)
+        taskEntry.put("label", label)
+        taskList.add(taskEntry)
+      }
+
+      // Filter out tasks that have no models available with hosted inference
+      val availableTasks = taskList
+        .parallelStream()
+        .filter(task => hasModelsForTask(task.get("tag").toString, hfToken))
+        .collect(Collectors.toList())

Review Comment:
   Points addressed:
   
   1. Bounded concurrency for the task fan-out. Created a dedicated 
ForkJoinPool with TASK_FETCH_PARALLELISM=4 and ran the parallelStream inside it 
(standard Java idiom - parallelStream picks up the enclosing pool when called 
from inside a Callable submitted to one). That caps concurrent HF probes at 4, 
regardless of how many task tags HF returns, and stops us from sharing the 
global common pool with the rest of the JVM. Cold-miss /tasks completes in ~13s 
(50 tasks × ~1s each / 4 workers); cached responses are still instant for the 
60-min TTL window.
   
   2. Explicit 429/503 handling in hasModelsForTask:
   
        case 429 | 503 =>
          logger.warn(s"HF rate-limit/unavailable (status ${response.getStatus})
                       when checking task '$task'")
          false
   
   Logged at WARN so it's visible in production logs by default - when 
rate-limit pressure starts becoming a real issue, the warning trail will make 
it obvious. Return value stays false so the affected task just disappears from 
this round's response rather than failing the whole call.
   
   Two related items I deliberately deferred to follow-ups, happy to take 
either in this PR if you'd prefer:
   
   - Retry-with-backoff on 429 (decided against it to keep the rate-limit 
signal visible)
   - Surface partial-failure to the client via a response header (rare enough 
event that the log-only signal feels sufficient)



##########
amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import kong.unirest.Unirest
+
+import javax.ws.rs._
+import javax.ws.rs.core.{MediaType, Response}
+import java.nio.file.{Files, Path => NioPath, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+/**
+  * REST resource that proxies the Hugging Face Hub API to list
+  * models for the HuggingFace operator.
+  *
+  * Browse mode:  GET /api/huggingface/models?task=text-generation
+  *   Fetches ALL models for the task from HF Hub (paginated internally),
+  *   caches the full list server-side, and returns it.
+  *
+  * Search mode:  GET /api/huggingface/models?task=text-generation&search=bert
+  *   Forwards the search query to HF Hub API (searches all models).
+  */
+@Path("/huggingface")
+@Produces(Array(MediaType.APPLICATION_JSON))
+class HuggingFaceModelResource {
+
+  import HuggingFaceModelResource._
+
+  @GET
+  @Path("/models")
+  def listModels(
+      @QueryParam("task") @DefaultValue("text-generation") task: String,
+      @QueryParam("search") search: String
+  ): Response = {
+    try {
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+
+      // ── Search mode: forward query to HF Hub, return results directly ──
+      if (search != null && search.trim.nonEmpty) {
+        return fetchSearchResults(task, search.trim, hfToken)
+      }
+
+      // ── Browse mode: return ALL models for this task (cached) ──
+      val cached = modelCache.get(task)
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      // Not cached — fetch all pages from HF Hub API
+      val allModels = fetchAllModelsForTask(task, hfToken)
+      val json = objectMapper.writeValueAsString(allModels)
+      modelCache.put(task, json)
+
+      Response.ok(json).build()
+
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @POST
+  @Path("/upload-audio")
+  @Consumes(Array(MediaType.WILDCARD))
+  def uploadAudioReference(
+      @QueryParam("filename") filename: String,
+      bytes: Array[Byte]
+  ): Response = {
+    try {
+      if (bytes == null || bytes.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Audio payload is empty."}""")
+          .build()
+      }
+
+      val safeFileName = Option(filename)
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(name => Paths.get(name).getFileName.toString)
+        .getOrElse("audio.bin")
+      val extension = {
+        val idx = safeFileName.lastIndexOf('.')
+        if (idx >= 0 && idx < safeFileName.length - 1) 
safeFileName.substring(idx) else ".bin"
+      }

Review Comment:
   Fixes:
   
   Defined a list of real audio extensions (.wav, .mp3, .mpeg, .flac, .ogg, 
.oga, .webm, .opus, .amr, .m4a, .aac). The extracted extension is lowercased 
before checking (so .WAV and .wav both match), and uploads with extensions 
outside the list are rejected with HTTP 400 - before any file is created on 
disk.
   
   The extension list mirrors what inferAudioContentType understands, so any 
extension that passes the check also has a known MIME type for the preview 
endpoint.



##########
amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import kong.unirest.Unirest
+
+import javax.ws.rs._
+import javax.ws.rs.core.{MediaType, Response}
+import java.nio.file.{Files, Path => NioPath, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+/**
+  * REST resource that proxies the Hugging Face Hub API to list
+  * models for the HuggingFace operator.
+  *
+  * Browse mode:  GET /api/huggingface/models?task=text-generation
+  *   Fetches ALL models for the task from HF Hub (paginated internally),
+  *   caches the full list server-side, and returns it.
+  *
+  * Search mode:  GET /api/huggingface/models?task=text-generation&search=bert
+  *   Forwards the search query to HF Hub API (searches all models).
+  */
+@Path("/huggingface")
+@Produces(Array(MediaType.APPLICATION_JSON))
+class HuggingFaceModelResource {
+
+  import HuggingFaceModelResource._
+
+  @GET
+  @Path("/models")
+  def listModels(
+      @QueryParam("task") @DefaultValue("text-generation") task: String,
+      @QueryParam("search") search: String
+  ): Response = {
+    try {
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+
+      // ── Search mode: forward query to HF Hub, return results directly ──
+      if (search != null && search.trim.nonEmpty) {
+        return fetchSearchResults(task, search.trim, hfToken)
+      }
+
+      // ── Browse mode: return ALL models for this task (cached) ──
+      val cached = modelCache.get(task)
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      // Not cached — fetch all pages from HF Hub API
+      val allModels = fetchAllModelsForTask(task, hfToken)
+      val json = objectMapper.writeValueAsString(allModels)
+      modelCache.put(task, json)
+
+      Response.ok(json).build()
+
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @POST
+  @Path("/upload-audio")
+  @Consumes(Array(MediaType.WILDCARD))
+  def uploadAudioReference(
+      @QueryParam("filename") filename: String,
+      bytes: Array[Byte]
+  ): Response = {
+    try {
+      if (bytes == null || bytes.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Audio payload is empty."}""")
+          .build()
+      }
+
+      val safeFileName = Option(filename)
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(name => Paths.get(name).getFileName.toString)
+        .getOrElse("audio.bin")
+      val extension = {
+        val idx = safeFileName.lastIndexOf('.')
+        if (idx >= 0 && idx < safeFileName.length - 1) 
safeFileName.substring(idx) else ".bin"
+      }
+
+      val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), 
"texera-hf-audio")
+      Files.createDirectories(tempDir)
+      val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", 
extension)
+      Files.write(tempFile, bytes)
+
+      val json = objectMapper.writeValueAsString(
+        Map(
+          "path" -> tempFile.toAbsolutePath.toString,
+          "fileName" -> safeFileName
+        ).asJava
+      )
+      Response.ok(json).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @GET
+  @Path("/audio-preview")
+  def previewUploadedAudio(@QueryParam("path") path: String): Response = {
+    try {
+      val trimmedPath = Option(path).map(_.trim).getOrElse("")
+      if (trimmedPath.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Audio path is required."}""")
+          .build()
+      }
+
+      val tempDir = Paths
+        .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio")
+        .toAbsolutePath
+        .normalize()
+      val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize()
+      if (!requestedPath.startsWith(tempDir)) {
+        return Response
+          .status(Response.Status.FORBIDDEN)
+          .entity("""{"error":"Audio path is outside the allowed preview 
directory."}""")
+          .build()
+      }
+      if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) 
{
+        return Response
+          .status(Response.Status.NOT_FOUND)
+          .entity("""{"error":"Uploaded audio file was not found."}""")
+          .build()
+      }
+
+      val contentType = Option(Files.probeContentType(requestedPath))
+        .filter(_.trim.nonEmpty)
+        .getOrElse(inferAudioContentType(requestedPath))
+      Response.ok(Files.readAllBytes(requestedPath), contentType).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to read uploaded audio: 
${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @GET
+  @Path("/media-proxy")
+  def proxyRemoteMedia(@QueryParam("url") url: String): Response = {
+    try {
+      val trimmedUrl = Option(url).map(_.trim).getOrElse("")
+      if (trimmedUrl.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Media URL is required."}""")
+          .build()
+      }
+      if (!trimmedUrl.startsWith("http://";) && 
!trimmedUrl.startsWith("https://";)) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Only http(s) media URLs are supported."}""")
+          .build()
+      }
+
+      val upstreamResponse = Unirest
+        .get(trimmedUrl)
+        .connectTimeout(10000)
+        .socketTimeout(120000)
+        .asBytes()
+
+      if (upstreamResponse.getStatus != 200) {
+        return Response
+          .status(upstreamResponse.getStatus)
+          .entity(
+            s"""{"error":"Failed to fetch remote media: 
${upstreamResponse.getStatusText}"}"""
+          )
+          .build()
+      }
+
+      val contentType = 
Option(upstreamResponse.getHeaders.getFirst("Content-Type"))
+        .filter(_.trim.nonEmpty)
+        .getOrElse(MediaType.APPLICATION_OCTET_STREAM)
+      Response.ok(upstreamResponse.getBody, contentType).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to proxy remote media: 
${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  /** Search HF Hub for models matching a query within a task. */
+  private def fetchSearchResults(task: String, query: String, hfToken: 
String): Response = {
+    var request = Unirest
+      .get("https://huggingface.co/api/models";)
+      .queryString("pipeline_tag", task)
+      .queryString("sort", "downloads")
+      .queryString("direction", "-1")
+      .queryString("limit", "100")
+      .queryString("filter", task)
+      .queryString("inference", "warm")
+      .queryString("search", query)

Review Comment:
   Same fix as L21-24 applied here. fetchSearchResults was the only Unirest 
call site in the file without timeouts; added the standard pair:
   
   - .connectTimeout(CONNECT_TIMEOUT_MS)   // 10s - same as listTasks, 
listModels
   - .socketTimeout(SOCKET_TIMEOUT_MS)     // 30s
   
   Now the search method matches the timeout policy used everywhere else in the 
file (see L21-24 thread for the broader Unirest configuration approach using 
shared constants).



##########
amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import kong.unirest.Unirest

Review Comment:
   I added shared timeout constants in the companion object and applied them 
explicitly at every call site:
   
   - CONNECT_TIMEOUT_MS = 10s / SOCKET_TIMEOUT_MS = 30s - default for HF Hub 
calls (`/models`, `/tasks`, browse pagination).
   - CONNECT_TIMEOUT_SHORT_MS = 5s / SOCKET_TIMEOUT_SHORT_MS = 10s - fast probe 
in `hasModelsForTask` so the `/tasks` fan-out doesn't drag.
   - SOCKET_TIMEOUT_LONG_MS = 120s - `/media-proxy` downloads of larger media 
files.
   
   Per-call is intentional: different endpoints need different limits, and 
keeping it visible at the call site makes the policy reviewable inline. 
Connection pool size and retries are left at the defaults for now.



##########
amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import kong.unirest.Unirest
+
+import javax.ws.rs._
+import javax.ws.rs.core.{MediaType, Response}
+import java.nio.file.{Files, Path => NioPath, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+/**
+  * REST resource that proxies the Hugging Face Hub API to list
+  * models for the HuggingFace operator.
+  *
+  * Browse mode:  GET /api/huggingface/models?task=text-generation
+  *   Fetches ALL models for the task from HF Hub (paginated internally),
+  *   caches the full list server-side, and returns it.
+  *
+  * Search mode:  GET /api/huggingface/models?task=text-generation&search=bert
+  *   Forwards the search query to HF Hub API (searches all models).
+  */
+@Path("/huggingface")
+@Produces(Array(MediaType.APPLICATION_JSON))
+class HuggingFaceModelResource {
+
+  import HuggingFaceModelResource._
+
+  @GET
+  @Path("/models")
+  def listModels(
+      @QueryParam("task") @DefaultValue("text-generation") task: String,
+      @QueryParam("search") search: String
+  ): Response = {
+    try {
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+
+      // ── Search mode: forward query to HF Hub, return results directly ──
+      if (search != null && search.trim.nonEmpty) {
+        return fetchSearchResults(task, search.trim, hfToken)
+      }
+
+      // ── Browse mode: return ALL models for this task (cached) ──
+      val cached = modelCache.get(task)
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      // Not cached — fetch all pages from HF Hub API
+      val allModels = fetchAllModelsForTask(task, hfToken)
+      val json = objectMapper.writeValueAsString(allModels)
+      modelCache.put(task, json)

Review Comment:
   Fixed by switching both `modelCache` and `taskCache` to Guava's 
`CacheBuilder`:
   
   - `modelCache`: maximumSize=100, expireAfterWrite=60 min
   - `taskCache`:  maximumSize=8,   expireAfterWrite=60 min
   
   100 entries covers every HF pipeline task tag (HF has ~50-100 total); if we 
ever exceed it, Guava LRU-evicts the least-recently-used entry. The TTL bounds 
staleness at 60 minutes - HF Hub doesn't change fast enough
   for this to be visible in the picker UX.
   
   Note that the cache is also now bypassed when a request carries a user token 
(X-HF-Token header), so only anonymous public results ever live in the cache.
   
   Picked Guava over Caffeine because it's already on the classpath; happy to 
switch to Caffeine in a follow-up if you'd prefer the more modern 
implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to