Ma77Ball commented on code in PR #5124: URL: https://github.com/apache/texera/pull/5124#discussion_r3261558944
########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") Review Comment: How does the user add their token to the system? Is there a future PR to allow the user to specify a token in settings or the operator itself? ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest Review Comment: This is used throughout the file and should include some configuration settings, as the default settings might not be optimal. ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] + ): Response = { + try { + if (bytes == null || bytes.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio payload is empty."}""") + .build() + } + + val safeFileName = Option(filename) + .map(_.trim) + .filter(_.nonEmpty) + .map(name => Paths.get(name).getFileName.toString) + .getOrElse("audio.bin") + val extension = { + val idx = safeFileName.lastIndexOf('.') + if (idx >= 0 && idx < safeFileName.length - 1) safeFileName.substring(idx) else ".bin" + } + + val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + Files.createDirectories(tempDir) + val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", extension) + Files.write(tempFile, bytes) + + val json = objectMapper.writeValueAsString( + Map( + "path" -> tempFile.toAbsolutePath.toString, + "fileName" -> safeFileName + ).asJava + ) + Response.ok(json).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/audio-preview") + def previewUploadedAudio(@QueryParam("path") path: String): Response = { + try { + val trimmedPath = Option(path).map(_.trim).getOrElse("") + if (trimmedPath.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio path is required."}""") + .build() + } + + val tempDir = Paths + .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + .toAbsolutePath + .normalize() + val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize() + if (!requestedPath.startsWith(tempDir)) { + return Response + .status(Response.Status.FORBIDDEN) + .entity("""{"error":"Audio path is outside the allowed preview directory."}""") + .build() + } + if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) { + return Response + .status(Response.Status.NOT_FOUND) + .entity("""{"error":"Uploaded audio file was not found."}""") + .build() + } + + val contentType = Option(Files.probeContentType(requestedPath)) + .filter(_.trim.nonEmpty) + .getOrElse(inferAudioContentType(requestedPath)) + Response.ok(Files.readAllBytes(requestedPath), contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to read uploaded audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/media-proxy") + def proxyRemoteMedia(@QueryParam("url") url: String): Response = { + try { + val trimmedUrl = Option(url).map(_.trim).getOrElse("") + if (trimmedUrl.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Media URL is required."}""") + .build() + } + if (!trimmedUrl.startsWith("http://") && !trimmedUrl.startsWith("https://")) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Only http(s) media URLs are supported."}""") + .build() + } + + val upstreamResponse = Unirest + .get(trimmedUrl) + .connectTimeout(10000) + .socketTimeout(120000) + .asBytes() + + if (upstreamResponse.getStatus != 200) { + return Response + .status(upstreamResponse.getStatus) + .entity( + s"""{"error":"Failed to fetch remote media: ${upstreamResponse.getStatusText}"}""" + ) + .build() + } + + val contentType = Option(upstreamResponse.getHeaders.getFirst("Content-Type")) + .filter(_.trim.nonEmpty) + .getOrElse(MediaType.APPLICATION_OCTET_STREAM) + Response.ok(upstreamResponse.getBody, contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to proxy remote media: ${e.getMessage}"}""") + .build() + } + } + + /** Search HF Hub for models matching a query within a task. */ + private def fetchSearchResults(task: String, query: String, hfToken: String): Response = { + var request = Unirest + .get("https://huggingface.co/api/models") + .queryString("pipeline_tag", task) + .queryString("sort", "downloads") + .queryString("direction", "-1") + .queryString("limit", "100") + .queryString("filter", task) + .queryString("inference", "warm") + .queryString("search", query) + + if (hfToken.nonEmpty) { + request = request.header("Authorization", s"Bearer $hfToken") + } + + val hfResponse = request.asString() + + if (hfResponse.getStatus != 200) { + return Response + .status(hfResponse.getStatus) + .entity(s"""{"error":"Hugging Face API error: ${hfResponse.getStatusText}"}""") + .build() + } + + val rawModels = objectMapper.readValue(hfResponse.getBody, listOfMapsType) + val out = buildSimplifiedList(rawModels) + Response.ok(objectMapper.writeValueAsString(out)).build() + } + + /** + * Fetch pipeline task tags from the Hugging Face Hub API. + * GET /api/huggingface/tasks + * + * Returns a JSON array of objects: [{ "tag": "text-generation", "label": "Text Generation" }, ...] + * The result is cached server-side for the lifetime of the process. + */ + @GET + @Path("/tasks") + def listTasks(): Response = { + try { + val cached = taskCache.get("all") + if (cached != null) { + return Response.ok(cached).build() + } + + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + var request = Unirest + .get("https://huggingface.co/api/tasks") + .connectTimeout(10000) + .socketTimeout(15000) + + if (hfToken.nonEmpty) { + request = request.header("Authorization", s"Bearer $hfToken") + } + + val hfResponse = request.asString() + + if (hfResponse.getStatus != 200) { + return Response + .status(hfResponse.getStatus) + .entity(s"""{"error":"Hugging Face API error: ${hfResponse.getStatusText}"}""") + .build() + } + + // /api/tasks returns a JSON object: { "<pipeline_tag>": { "label": "...", ... }, ... } + // Using readTree so no entry is dropped regardless of its value type (null, array, etc.) + val root: JsonNode = objectMapper.readTree(hfResponse.getBody) + val taskList = new java.util.ArrayList[java.util.Map[String, Object]]() + val iter = root.fields() + while (iter.hasNext) { + val entry = iter.next() + val tag = entry.getKey + val info: JsonNode = entry.getValue + val label = + if (info != null && info.isObject && info.has("label")) info.get("label").asText(tag) + else tag + val taskEntry = new java.util.LinkedHashMap[String, Object]() + taskEntry.put("tag", tag) + taskEntry.put("label", label) + taskList.add(taskEntry) + } + + // Filter out tasks that have no models available with hosted inference + val availableTasks = taskList + .parallelStream() + .filter(task => hasModelsForTask(task.get("tag").toString, hfToken)) + .collect(Collectors.toList()) Review Comment: Is there a better way to send a request to HuggingFace without opening 50 concurrent requests to get available tasks on HuggingFace? ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] + ): Response = { + try { + if (bytes == null || bytes.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio payload is empty."}""") + .build() + } + + val safeFileName = Option(filename) + .map(_.trim) + .filter(_.nonEmpty) + .map(name => Paths.get(name).getFileName.toString) + .getOrElse("audio.bin") + val extension = { + val idx = safeFileName.lastIndexOf('.') + if (idx >= 0 && idx < safeFileName.length - 1) safeFileName.substring(idx) else ".bin" + } Review Comment: e.g., (prevent these): ``` POST /upload-audio?filename=evil.sh POST /upload-audio?filename=trick.html POST /upload-audio?filename=run.bat ``` ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] + ): Response = { + try { + if (bytes == null || bytes.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio payload is empty."}""") + .build() + } + + val safeFileName = Option(filename) + .map(_.trim) + .filter(_.nonEmpty) + .map(name => Paths.get(name).getFileName.toString) + .getOrElse("audio.bin") + val extension = { + val idx = safeFileName.lastIndexOf('.') + if (idx >= 0 && idx < safeFileName.length - 1) safeFileName.substring(idx) else ".bin" + } + + val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + Files.createDirectories(tempDir) + val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", extension) + Files.write(tempFile, bytes) + + val json = objectMapper.writeValueAsString( + Map( + "path" -> tempFile.toAbsolutePath.toString, + "fileName" -> safeFileName + ).asJava + ) + Response.ok(json).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/audio-preview") + def previewUploadedAudio(@QueryParam("path") path: String): Response = { + try { + val trimmedPath = Option(path).map(_.trim).getOrElse("") + if (trimmedPath.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio path is required."}""") + .build() + } + + val tempDir = Paths + .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + .toAbsolutePath + .normalize() + val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize() + if (!requestedPath.startsWith(tempDir)) { + return Response + .status(Response.Status.FORBIDDEN) + .entity("""{"error":"Audio path is outside the allowed preview directory."}""") + .build() + } + if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) { + return Response + .status(Response.Status.NOT_FOUND) + .entity("""{"error":"Uploaded audio file was not found."}""") + .build() + } + + val contentType = Option(Files.probeContentType(requestedPath)) + .filter(_.trim.nonEmpty) + .getOrElse(inferAudioContentType(requestedPath)) + Response.ok(Files.readAllBytes(requestedPath), contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to read uploaded audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/media-proxy") + def proxyRemoteMedia(@QueryParam("url") url: String): Response = { + try { + val trimmedUrl = Option(url).map(_.trim).getOrElse("") + if (trimmedUrl.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Media URL is required."}""") + .build() + } + if (!trimmedUrl.startsWith("http://") && !trimmedUrl.startsWith("https://")) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Only http(s) media URLs are supported."}""") + .build() + } + + val upstreamResponse = Unirest + .get(trimmedUrl) + .connectTimeout(10000) + .socketTimeout(120000) + .asBytes() + + if (upstreamResponse.getStatus != 200) { + return Response + .status(upstreamResponse.getStatus) + .entity( + s"""{"error":"Failed to fetch remote media: ${upstreamResponse.getStatusText}"}""" + ) + .build() + } + + val contentType = Option(upstreamResponse.getHeaders.getFirst("Content-Type")) + .filter(_.trim.nonEmpty) + .getOrElse(MediaType.APPLICATION_OCTET_STREAM) + Response.ok(upstreamResponse.getBody, contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to proxy remote media: ${e.getMessage}"}""") + .build() + } + } + + /** Search HF Hub for models matching a query within a task. */ + private def fetchSearchResults(task: String, query: String, hfToken: String): Response = { + var request = Unirest + .get("https://huggingface.co/api/models") + .queryString("pipeline_tag", task) + .queryString("sort", "downloads") + .queryString("direction", "-1") + .queryString("limit", "100") + .queryString("filter", task) + .queryString("inference", "warm") + .queryString("search", query) + + if (hfToken.nonEmpty) { + request = request.header("Authorization", s"Bearer $hfToken") + } + + val hfResponse = request.asString() + + if (hfResponse.getStatus != 200) { + return Response + .status(hfResponse.getStatus) + .entity(s"""{"error":"Hugging Face API error: ${hfResponse.getStatusText}"}""") + .build() + } + + val rawModels = objectMapper.readValue(hfResponse.getBody, listOfMapsType) + val out = buildSimplifiedList(rawModels) + Response.ok(objectMapper.writeValueAsString(out)).build() + } + + /** + * Fetch pipeline task tags from the Hugging Face Hub API. + * GET /api/huggingface/tasks + * + * Returns a JSON array of objects: [{ "tag": "text-generation", "label": "Text Generation" }, ...] + * The result is cached server-side for the lifetime of the process. + */ + @GET + @Path("/tasks") + def listTasks(): Response = { + try { + val cached = taskCache.get("all") + if (cached != null) { + return Response.ok(cached).build() + } + + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + var request = Unirest + .get("https://huggingface.co/api/tasks") + .connectTimeout(10000) + .socketTimeout(15000) + + if (hfToken.nonEmpty) { + request = request.header("Authorization", s"Bearer $hfToken") + } + + val hfResponse = request.asString() + + if (hfResponse.getStatus != 200) { + return Response + .status(hfResponse.getStatus) + .entity(s"""{"error":"Hugging Face API error: ${hfResponse.getStatusText}"}""") + .build() + } + + // /api/tasks returns a JSON object: { "<pipeline_tag>": { "label": "...", ... }, ... } + // Using readTree so no entry is dropped regardless of its value type (null, array, etc.) + val root: JsonNode = objectMapper.readTree(hfResponse.getBody) + val taskList = new java.util.ArrayList[java.util.Map[String, Object]]() + val iter = root.fields() + while (iter.hasNext) { + val entry = iter.next() + val tag = entry.getKey + val info: JsonNode = entry.getValue + val label = + if (info != null && info.isObject && info.has("label")) info.get("label").asText(tag) + else tag + val taskEntry = new java.util.LinkedHashMap[String, Object]() + taskEntry.put("tag", tag) + taskEntry.put("label", label) + taskList.add(taskEntry) + } + + // Filter out tasks that have no models available with hosted inference + val availableTasks = taskList + .parallelStream() + .filter(task => hasModelsForTask(task.get("tag").toString, hfToken)) + .collect(Collectors.toList()) + + val json = objectMapper.writeValueAsString(availableTasks) + taskCache.put("all", json) + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch tasks: ${e.getMessage}"}""") + .build() + } + } + + /** + * Fetch ALL models for a given task by paginating through the HF Hub API. + * HF Hub uses a Link header with rel="next" for pagination. + * We fetch pages of 1000 models at a time, up to MAX_PAGES pages. + */ + private def fetchAllModelsForTask( + task: String, + hfToken: String + ): java.util.List[java.util.Map[String, Object]] = { + val allResults = new java.util.ArrayList[java.util.Map[String, Object]]() + var nextUrl: String = null + var pageCount = 0 Review Comment: MAX_PAGES does not exist in this PR. So what is pageCount used for, and is it needed in this pr? ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] + ): Response = { + try { + if (bytes == null || bytes.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio payload is empty."}""") + .build() + } + + val safeFileName = Option(filename) + .map(_.trim) + .filter(_.nonEmpty) + .map(name => Paths.get(name).getFileName.toString) + .getOrElse("audio.bin") + val extension = { + val idx = safeFileName.lastIndexOf('.') + if (idx >= 0 && idx < safeFileName.length - 1) safeFileName.substring(idx) else ".bin" + } + + val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") Review Comment: Please add a way to clean up this folder when no longer needed. ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] Review Comment: There should be a size limit to prevent users from posting overly large audio files that will use up all the RAM. Please either implement a hard limit or improve how we handle large audio files so we don't store them in memory before writing to disk. ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] + ): Response = { + try { + if (bytes == null || bytes.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio payload is empty."}""") + .build() + } + + val safeFileName = Option(filename) + .map(_.trim) + .filter(_.nonEmpty) + .map(name => Paths.get(name).getFileName.toString) + .getOrElse("audio.bin") + val extension = { + val idx = safeFileName.lastIndexOf('.') + if (idx >= 0 && idx < safeFileName.length - 1) safeFileName.substring(idx) else ".bin" + } + + val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + Files.createDirectories(tempDir) + val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", extension) + Files.write(tempFile, bytes) + + val json = objectMapper.writeValueAsString( + Map( + "path" -> tempFile.toAbsolutePath.toString, + "fileName" -> safeFileName + ).asJava + ) + Response.ok(json).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/audio-preview") + def previewUploadedAudio(@QueryParam("path") path: String): Response = { + try { + val trimmedPath = Option(path).map(_.trim).getOrElse("") + if (trimmedPath.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio path is required."}""") + .build() + } + + val tempDir = Paths + .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + .toAbsolutePath + .normalize() + val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize() + if (!requestedPath.startsWith(tempDir)) { + return Response + .status(Response.Status.FORBIDDEN) + .entity("""{"error":"Audio path is outside the allowed preview directory."}""") + .build() + } + if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) { + return Response + .status(Response.Status.NOT_FOUND) + .entity("""{"error":"Uploaded audio file was not found."}""") + .build() + } + + val contentType = Option(Files.probeContentType(requestedPath)) + .filter(_.trim.nonEmpty) + .getOrElse(inferAudioContentType(requestedPath)) + Response.ok(Files.readAllBytes(requestedPath), contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to read uploaded audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/media-proxy") + def proxyRemoteMedia(@QueryParam("url") url: String): Response = { + try { + val trimmedUrl = Option(url).map(_.trim).getOrElse("") + if (trimmedUrl.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Media URL is required."}""") + .build() + } + if (!trimmedUrl.startsWith("http://") && !trimmedUrl.startsWith("https://")) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Only http(s) media URLs are supported."}""") + .build() + } + + val upstreamResponse = Unirest + .get(trimmedUrl) + .connectTimeout(10000) + .socketTimeout(120000) + .asBytes() + + if (upstreamResponse.getStatus != 200) { + return Response + .status(upstreamResponse.getStatus) + .entity( + s"""{"error":"Failed to fetch remote media: ${upstreamResponse.getStatusText}"}""" + ) + .build() + } + + val contentType = Option(upstreamResponse.getHeaders.getFirst("Content-Type")) + .filter(_.trim.nonEmpty) + .getOrElse(MediaType.APPLICATION_OCTET_STREAM) + Response.ok(upstreamResponse.getBody, contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to proxy remote media: ${e.getMessage}"}""") + .build() + } + } + + /** Search HF Hub for models matching a query within a task. */ + private def fetchSearchResults(task: String, query: String, hfToken: String): Response = { + var request = Unirest + .get("https://huggingface.co/api/models") + .queryString("pipeline_tag", task) + .queryString("sort", "downloads") + .queryString("direction", "-1") + .queryString("limit", "100") + .queryString("filter", task) + .queryString("inference", "warm") + .queryString("search", query) Review Comment: There should be a connection timeout added here, similar to fetchAllModelsForTask ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] + ): Response = { + try { + if (bytes == null || bytes.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio payload is empty."}""") + .build() + } + + val safeFileName = Option(filename) + .map(_.trim) + .filter(_.nonEmpty) + .map(name => Paths.get(name).getFileName.toString) + .getOrElse("audio.bin") + val extension = { + val idx = safeFileName.lastIndexOf('.') + if (idx >= 0 && idx < safeFileName.length - 1) safeFileName.substring(idx) else ".bin" + } Review Comment: There should be an allow list of extensions to prevent users or attackers from uploading files such as bash scripts. ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] + ): Response = { + try { + if (bytes == null || bytes.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio payload is empty."}""") + .build() + } + + val safeFileName = Option(filename) + .map(_.trim) + .filter(_.nonEmpty) + .map(name => Paths.get(name).getFileName.toString) + .getOrElse("audio.bin") + val extension = { + val idx = safeFileName.lastIndexOf('.') + if (idx >= 0 && idx < safeFileName.length - 1) safeFileName.substring(idx) else ".bin" + } + + val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + Files.createDirectories(tempDir) + val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", extension) + Files.write(tempFile, bytes) + + val json = objectMapper.writeValueAsString( + Map( + "path" -> tempFile.toAbsolutePath.toString, + "fileName" -> safeFileName + ).asJava + ) + Response.ok(json).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/audio-preview") + def previewUploadedAudio(@QueryParam("path") path: String): Response = { + try { + val trimmedPath = Option(path).map(_.trim).getOrElse("") + if (trimmedPath.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio path is required."}""") + .build() + } + + val tempDir = Paths + .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + .toAbsolutePath + .normalize() + val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize() + if (!requestedPath.startsWith(tempDir)) { + return Response + .status(Response.Status.FORBIDDEN) + .entity("""{"error":"Audio path is outside the allowed preview directory."}""") + .build() + } + if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) { + return Response + .status(Response.Status.NOT_FOUND) + .entity("""{"error":"Uploaded audio file was not found."}""") + .build() + } + + val contentType = Option(Files.probeContentType(requestedPath)) + .filter(_.trim.nonEmpty) + .getOrElse(inferAudioContentType(requestedPath)) + Response.ok(Files.readAllBytes(requestedPath), contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to read uploaded audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/media-proxy") + def proxyRemoteMedia(@QueryParam("url") url: String): Response = { + try { + val trimmedUrl = Option(url).map(_.trim).getOrElse("") + if (trimmedUrl.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Media URL is required."}""") + .build() + } + if (!trimmedUrl.startsWith("http://") && !trimmedUrl.startsWith("https://")) { Review Comment: The endpoint will fetch any URL it is given, allowing an attacker to reach internal services. An allowlist should be implemented to avoid this issue. ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] + ): Response = { + try { + if (bytes == null || bytes.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio payload is empty."}""") + .build() + } + + val safeFileName = Option(filename) + .map(_.trim) + .filter(_.nonEmpty) + .map(name => Paths.get(name).getFileName.toString) + .getOrElse("audio.bin") + val extension = { + val idx = safeFileName.lastIndexOf('.') + if (idx >= 0 && idx < safeFileName.length - 1) safeFileName.substring(idx) else ".bin" + } + + val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + Files.createDirectories(tempDir) + val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", extension) + Files.write(tempFile, bytes) + + val json = objectMapper.writeValueAsString( + Map( + "path" -> tempFile.toAbsolutePath.toString, + "fileName" -> safeFileName + ).asJava + ) + Response.ok(json).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/audio-preview") + def previewUploadedAudio(@QueryParam("path") path: String): Response = { + try { + val trimmedPath = Option(path).map(_.trim).getOrElse("") + if (trimmedPath.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio path is required."}""") + .build() + } + + val tempDir = Paths + .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + .toAbsolutePath + .normalize() + val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize() + if (!requestedPath.startsWith(tempDir)) { + return Response + .status(Response.Status.FORBIDDEN) + .entity("""{"error":"Audio path is outside the allowed preview directory."}""") + .build() + } + if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) { + return Response + .status(Response.Status.NOT_FOUND) + .entity("""{"error":"Uploaded audio file was not found."}""") + .build() + } + + val contentType = Option(Files.probeContentType(requestedPath)) + .filter(_.trim.nonEmpty) + .getOrElse(inferAudioContentType(requestedPath)) + Response.ok(Files.readAllBytes(requestedPath), contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to read uploaded audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/media-proxy") + def proxyRemoteMedia(@QueryParam("url") url: String): Response = { + try { + val trimmedUrl = Option(url).map(_.trim).getOrElse("") + if (trimmedUrl.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Media URL is required."}""") + .build() + } + if (!trimmedUrl.startsWith("http://") && !trimmedUrl.startsWith("https://")) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Only http(s) media URLs are supported."}""") + .build() + } + + val upstreamResponse = Unirest + .get(trimmedUrl) + .connectTimeout(10000) + .socketTimeout(120000) + .asBytes() + + if (upstreamResponse.getStatus != 200) { + return Response + .status(upstreamResponse.getStatus) + .entity( + s"""{"error":"Failed to fetch remote media: ${upstreamResponse.getStatusText}"}""" + ) + .build() + } + + val contentType = Option(upstreamResponse.getHeaders.getFirst("Content-Type")) + .filter(_.trim.nonEmpty) + .getOrElse(MediaType.APPLICATION_OCTET_STREAM) + Response.ok(upstreamResponse.getBody, contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to proxy remote media: ${e.getMessage}"}""") + .build() + } + } + + /** Search HF Hub for models matching a query within a task. */ + private def fetchSearchResults(task: String, query: String, hfToken: String): Response = { + var request = Unirest + .get("https://huggingface.co/api/models") + .queryString("pipeline_tag", task) + .queryString("sort", "downloads") + .queryString("direction", "-1") + .queryString("limit", "100") + .queryString("filter", task) + .queryString("inference", "warm") + .queryString("search", query) + + if (hfToken.nonEmpty) { + request = request.header("Authorization", s"Bearer $hfToken") + } + + val hfResponse = request.asString() + + if (hfResponse.getStatus != 200) { + return Response + .status(hfResponse.getStatus) + .entity(s"""{"error":"Hugging Face API error: ${hfResponse.getStatusText}"}""") + .build() + } + + val rawModels = objectMapper.readValue(hfResponse.getBody, listOfMapsType) + val out = buildSimplifiedList(rawModels) + Response.ok(objectMapper.writeValueAsString(out)).build() + } + + /** + * Fetch pipeline task tags from the Hugging Face Hub API. + * GET /api/huggingface/tasks + * + * Returns a JSON array of objects: [{ "tag": "text-generation", "label": "Text Generation" }, ...] + * The result is cached server-side for the lifetime of the process. + */ + @GET + @Path("/tasks") + def listTasks(): Response = { + try { + val cached = taskCache.get("all") + if (cached != null) { + return Response.ok(cached).build() + } + + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + var request = Unirest + .get("https://huggingface.co/api/tasks") + .connectTimeout(10000) + .socketTimeout(15000) + + if (hfToken.nonEmpty) { + request = request.header("Authorization", s"Bearer $hfToken") + } + + val hfResponse = request.asString() + + if (hfResponse.getStatus != 200) { + return Response + .status(hfResponse.getStatus) + .entity(s"""{"error":"Hugging Face API error: ${hfResponse.getStatusText}"}""") + .build() + } + + // /api/tasks returns a JSON object: { "<pipeline_tag>": { "label": "...", ... }, ... } + // Using readTree so no entry is dropped regardless of its value type (null, array, etc.) + val root: JsonNode = objectMapper.readTree(hfResponse.getBody) + val taskList = new java.util.ArrayList[java.util.Map[String, Object]]() + val iter = root.fields() + while (iter.hasNext) { + val entry = iter.next() + val tag = entry.getKey + val info: JsonNode = entry.getValue + val label = + if (info != null && info.isObject && info.has("label")) info.get("label").asText(tag) + else tag + val taskEntry = new java.util.LinkedHashMap[String, Object]() + taskEntry.put("tag", tag) + taskEntry.put("label", label) + taskList.add(taskEntry) + } + + // Filter out tasks that have no models available with hosted inference + val availableTasks = taskList + .parallelStream() + .filter(task => hasModelsForTask(task.get("tag").toString, hfToken)) + .collect(Collectors.toList()) + + val json = objectMapper.writeValueAsString(availableTasks) + taskCache.put("all", json) + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch tasks: ${e.getMessage}"}""") + .build() + } + } + + /** + * Fetch ALL models for a given task by paginating through the HF Hub API. + * HF Hub uses a Link header with rel="next" for pagination. + * We fetch pages of 1000 models at a time, up to MAX_PAGES pages. + */ + private def fetchAllModelsForTask( + task: String, + hfToken: String + ): java.util.List[java.util.Map[String, Object]] = { + val allResults = new java.util.ArrayList[java.util.Map[String, Object]]() + var nextUrl: String = null + var pageCount = 0 + + // First request — inference=warm limits to models with hosted Inference API + var request = Unirest + .get("https://huggingface.co/api/models") + .queryString("pipeline_tag", task) + .queryString("sort", "downloads") + .queryString("direction", "-1") + .queryString("limit", PAGE_SIZE.toString) + .queryString("filter", task) + .queryString("inference", "warm") + .connectTimeout(10000) + .socketTimeout(30000) + + if (hfToken.nonEmpty) { + request = request.header("Authorization", s"Bearer $hfToken") + } + + var hfResponse = request.asString() + + if (hfResponse.getStatus != 200) { + throw new RuntimeException( + s"HF API returned ${hfResponse.getStatus}: ${hfResponse.getStatusText}" + ) + } + + var rawModels = objectMapper.readValue(hfResponse.getBody, listOfMapsType) + allResults.addAll(buildSimplifiedList(rawModels)) + pageCount += 1 + + // Extract next page URL from Link header + nextUrl = extractNextLink(hfResponse.getHeaders.getFirst("Link")) + + // Fetch remaining pages until exhausted + while (nextUrl != null) { + var nextRequest = Unirest + .get(nextUrl) + .connectTimeout(10000) + .socketTimeout(30000) + if (hfToken.nonEmpty) { + nextRequest = nextRequest.header("Authorization", s"Bearer $hfToken") + } + + hfResponse = nextRequest.asString() + + if (hfResponse.getStatus != 200) { + // Stop paginating on error, return what we have so far + return allResults + } Review Comment: Possibly add a message or log entry indicating that an error occurred, rather than caching and returning an incomplete list. ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) Review Comment: The model cache needs cleanup logic, eviction policy, and a size limit. The current design only reads and puts the models in cache. ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] + ): Response = { + try { + if (bytes == null || bytes.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio payload is empty."}""") + .build() + } + + val safeFileName = Option(filename) + .map(_.trim) + .filter(_.nonEmpty) + .map(name => Paths.get(name).getFileName.toString) + .getOrElse("audio.bin") + val extension = { + val idx = safeFileName.lastIndexOf('.') + if (idx >= 0 && idx < safeFileName.length - 1) safeFileName.substring(idx) else ".bin" + } + + val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + Files.createDirectories(tempDir) + val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", extension) + Files.write(tempFile, bytes) + + val json = objectMapper.writeValueAsString( + Map( + "path" -> tempFile.toAbsolutePath.toString, + "fileName" -> safeFileName + ).asJava + ) + Response.ok(json).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/audio-preview") + def previewUploadedAudio(@QueryParam("path") path: String): Response = { + try { + val trimmedPath = Option(path).map(_.trim).getOrElse("") + if (trimmedPath.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio path is required."}""") + .build() + } + + val tempDir = Paths + .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + .toAbsolutePath + .normalize() + val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize() + if (!requestedPath.startsWith(tempDir)) { + return Response + .status(Response.Status.FORBIDDEN) + .entity("""{"error":"Audio path is outside the allowed preview directory."}""") + .build() + } + if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) { + return Response + .status(Response.Status.NOT_FOUND) + .entity("""{"error":"Uploaded audio file was not found."}""") + .build() + } + + val contentType = Option(Files.probeContentType(requestedPath)) + .filter(_.trim.nonEmpty) + .getOrElse(inferAudioContentType(requestedPath)) + Response.ok(Files.readAllBytes(requestedPath), contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to read uploaded audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/media-proxy") + def proxyRemoteMedia(@QueryParam("url") url: String): Response = { + try { + val trimmedUrl = Option(url).map(_.trim).getOrElse("") + if (trimmedUrl.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Media URL is required."}""") + .build() + } + if (!trimmedUrl.startsWith("http://") && !trimmedUrl.startsWith("https://")) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Only http(s) media URLs are supported."}""") + .build() + } + + val upstreamResponse = Unirest + .get(trimmedUrl) + .connectTimeout(10000) + .socketTimeout(120000) + .asBytes() + + if (upstreamResponse.getStatus != 200) { + return Response + .status(upstreamResponse.getStatus) + .entity( + s"""{"error":"Failed to fetch remote media: ${upstreamResponse.getStatusText}"}""" + ) + .build() + } + + val contentType = Option(upstreamResponse.getHeaders.getFirst("Content-Type")) + .filter(_.trim.nonEmpty) + .getOrElse(MediaType.APPLICATION_OCTET_STREAM) + Response.ok(upstreamResponse.getBody, contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to proxy remote media: ${e.getMessage}"}""") + .build() + } + } + + /** Search HF Hub for models matching a query within a task. */ + private def fetchSearchResults(task: String, query: String, hfToken: String): Response = { + var request = Unirest + .get("https://huggingface.co/api/models") + .queryString("pipeline_tag", task) + .queryString("sort", "downloads") + .queryString("direction", "-1") + .queryString("limit", "100") Review Comment: There should be a pagination loop or a way to let users know that they are viewing a truncated list. ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } + } + + @POST + @Path("/upload-audio") + @Consumes(Array(MediaType.WILDCARD)) + def uploadAudioReference( + @QueryParam("filename") filename: String, + bytes: Array[Byte] + ): Response = { + try { + if (bytes == null || bytes.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio payload is empty."}""") + .build() + } + + val safeFileName = Option(filename) + .map(_.trim) + .filter(_.nonEmpty) + .map(name => Paths.get(name).getFileName.toString) + .getOrElse("audio.bin") + val extension = { + val idx = safeFileName.lastIndexOf('.') + if (idx >= 0 && idx < safeFileName.length - 1) safeFileName.substring(idx) else ".bin" + } + + val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + Files.createDirectories(tempDir) + val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", extension) + Files.write(tempFile, bytes) + + val json = objectMapper.writeValueAsString( + Map( + "path" -> tempFile.toAbsolutePath.toString, + "fileName" -> safeFileName + ).asJava + ) + Response.ok(json).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/audio-preview") + def previewUploadedAudio(@QueryParam("path") path: String): Response = { + try { + val trimmedPath = Option(path).map(_.trim).getOrElse("") + if (trimmedPath.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Audio path is required."}""") + .build() + } + + val tempDir = Paths + .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio") + .toAbsolutePath + .normalize() + val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize() + if (!requestedPath.startsWith(tempDir)) { + return Response + .status(Response.Status.FORBIDDEN) + .entity("""{"error":"Audio path is outside the allowed preview directory."}""") + .build() + } + if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) { + return Response + .status(Response.Status.NOT_FOUND) + .entity("""{"error":"Uploaded audio file was not found."}""") + .build() + } + + val contentType = Option(Files.probeContentType(requestedPath)) + .filter(_.trim.nonEmpty) + .getOrElse(inferAudioContentType(requestedPath)) + Response.ok(Files.readAllBytes(requestedPath), contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to read uploaded audio: ${e.getMessage}"}""") + .build() + } + } + + @GET + @Path("/media-proxy") + def proxyRemoteMedia(@QueryParam("url") url: String): Response = { + try { + val trimmedUrl = Option(url).map(_.trim).getOrElse("") + if (trimmedUrl.isEmpty) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Media URL is required."}""") + .build() + } + if (!trimmedUrl.startsWith("http://") && !trimmedUrl.startsWith("https://")) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("""{"error":"Only http(s) media URLs are supported."}""") + .build() + } + + val upstreamResponse = Unirest + .get(trimmedUrl) + .connectTimeout(10000) + .socketTimeout(120000) + .asBytes() + + if (upstreamResponse.getStatus != 200) { + return Response + .status(upstreamResponse.getStatus) + .entity( + s"""{"error":"Failed to fetch remote media: ${upstreamResponse.getStatusText}"}""" + ) + .build() + } + + val contentType = Option(upstreamResponse.getHeaders.getFirst("Content-Type")) + .filter(_.trim.nonEmpty) + .getOrElse(MediaType.APPLICATION_OCTET_STREAM) + Response.ok(upstreamResponse.getBody, contentType).build() + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to proxy remote media: ${e.getMessage}"}""") + .build() + } + } + + /** Search HF Hub for models matching a query within a task. */ + private def fetchSearchResults(task: String, query: String, hfToken: String): Response = { + var request = Unirest + .get("https://huggingface.co/api/models") + .queryString("pipeline_tag", task) + .queryString("sort", "downloads") + .queryString("direction", "-1") + .queryString("limit", "100") + .queryString("filter", task) + .queryString("inference", "warm") + .queryString("search", query) + + if (hfToken.nonEmpty) { + request = request.header("Authorization", s"Bearer $hfToken") + } + + val hfResponse = request.asString() + + if (hfResponse.getStatus != 200) { + return Response + .status(hfResponse.getStatus) + .entity(s"""{"error":"Hugging Face API error: ${hfResponse.getStatusText}"}""") + .build() + } + + val rawModels = objectMapper.readValue(hfResponse.getBody, listOfMapsType) + val out = buildSimplifiedList(rawModels) + Response.ok(objectMapper.writeValueAsString(out)).build() + } + + /** + * Fetch pipeline task tags from the Hugging Face Hub API. + * GET /api/huggingface/tasks + * + * Returns a JSON array of objects: [{ "tag": "text-generation", "label": "Text Generation" }, ...] + * The result is cached server-side for the lifetime of the process. + */ + @GET + @Path("/tasks") + def listTasks(): Response = { + try { + val cached = taskCache.get("all") + if (cached != null) { + return Response.ok(cached).build() + } + + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + var request = Unirest + .get("https://huggingface.co/api/tasks") + .connectTimeout(10000) + .socketTimeout(15000) + + if (hfToken.nonEmpty) { + request = request.header("Authorization", s"Bearer $hfToken") + } + + val hfResponse = request.asString() + + if (hfResponse.getStatus != 200) { + return Response + .status(hfResponse.getStatus) + .entity(s"""{"error":"Hugging Face API error: ${hfResponse.getStatusText}"}""") + .build() + } + + // /api/tasks returns a JSON object: { "<pipeline_tag>": { "label": "...", ... }, ... } + // Using readTree so no entry is dropped regardless of its value type (null, array, etc.) + val root: JsonNode = objectMapper.readTree(hfResponse.getBody) + val taskList = new java.util.ArrayList[java.util.Map[String, Object]]() + val iter = root.fields() + while (iter.hasNext) { + val entry = iter.next() + val tag = entry.getKey + val info: JsonNode = entry.getValue + val label = + if (info != null && info.isObject && info.has("label")) info.get("label").asText(tag) + else tag + val taskEntry = new java.util.LinkedHashMap[String, Object]() + taskEntry.put("tag", tag) + taskEntry.put("label", label) + taskList.add(taskEntry) + } + + // Filter out tasks that have no models available with hosted inference + val availableTasks = taskList + .parallelStream() + .filter(task => hasModelsForTask(task.get("tag").toString, hfToken)) + .collect(Collectors.toList()) Review Comment: I think a good solution is to limit parallel executions, add a try-catch block, and display an error for potential 429 or 503 responses (e.g., Rate limit reached). ########## amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import kong.unirest.Unirest + +import javax.ws.rs._ +import javax.ws.rs.core.{MediaType, Response} +import java.nio.file.{Files, Path => NioPath, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +/** + * REST resource that proxies the Hugging Face Hub API to list + * models for the HuggingFace operator. + * + * Browse mode: GET /api/huggingface/models?task=text-generation + * Fetches ALL models for the task from HF Hub (paginated internally), + * caches the full list server-side, and returns it. + * + * Search mode: GET /api/huggingface/models?task=text-generation&search=bert + * Forwards the search query to HF Hub API (searches all models). + */ +@Path("/huggingface") +@Produces(Array(MediaType.APPLICATION_JSON)) +class HuggingFaceModelResource { + + import HuggingFaceModelResource._ + + @GET + @Path("/models") + def listModels( + @QueryParam("task") @DefaultValue("text-generation") task: String, + @QueryParam("search") search: String + ): Response = { + try { + val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("") + + // ── Search mode: forward query to HF Hub, return results directly ── + if (search != null && search.trim.nonEmpty) { + return fetchSearchResults(task, search.trim, hfToken) + } + + // ── Browse mode: return ALL models for this task (cached) ── + val cached = modelCache.get(task) + if (cached != null) { + return Response.ok(cached).build() + } + + // Not cached — fetch all pages from HF Hub API + val allModels = fetchAllModelsForTask(task, hfToken) + val json = objectMapper.writeValueAsString(allModels) + modelCache.put(task, json) + + Response.ok(json).build() + + } catch { + case e: Exception => + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""") + .build() + } Review Comment: The current design returns the exact exception message, which exposes internal details to users. Suggestions: 1. Have Jackson handle escaping instead of concatenating the strings ``` import scala.jdk.CollectionConverters._ private def errorJson(message: String): String = { objectMapper.writeValueAsString(Map("error" -> message).asJava) } ``` 3. Don't expose the e.getMessage to users. return a generic message (do this for all the try catch statements) ``` } catch { case e: Exception => logger.error("Model fetch failed", e) Response .status(Response.Status.INTERNAL_SERVER_ERROR) .entity(errorJson("Failed to fetch models.")) .build() } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
