He-Pin commented on code in PR #1724: URL: https://github.com/apache/pekko/pull/1724#discussion_r1922755319
########## actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala: ########## @@ -57,19 +76,38 @@ private[dispatch] object VirtualThreadSupport { } } - def newThreadPerTaskExecutor(threadFactory: ThreadFactory): ExecutorService = { - require(threadFactory != null, "threadFactory should not be null.") + /** + * Create a virtual thread factory with the specified executor as the scheduler of virtual thread. + */ + def newVirtualThreadFactory(prefix: String, executor: ExecutorService): ThreadFactory = try { - val executorsClazz = ClassLoader.getSystemClassLoader.loadClass("java.util.concurrent.Executors") - val newThreadPerTaskExecutorMethod = lookup.findStatic( - executorsClazz, - "newThreadPerTaskExecutor", - MethodType.methodType(classOf[ExecutorService], classOf[ThreadFactory])) - newThreadPerTaskExecutorMethod.invoke(threadFactory).asInstanceOf[ExecutorService] + val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder") + val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual") + val ofVirtualMethod = classOf[Thread].getDeclaredMethod("ofVirtual") + var builder = ofVirtualMethod.invoke(null) + if (executor != null) { + val clazz = builder.getClass + val field = clazz.getDeclaredField("scheduler") + field.setAccessible(true) + field.set(builder, executor) + } + val nameMethod = ofVirtualClass.getDeclaredMethod("name", classOf[String], classOf[Long]) + val factoryMethod = builderClass.getDeclaredMethod("factory") + val zero = java.lang.Long.valueOf(0L) + builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero) + factoryMethod.invoke(builder).asInstanceOf[ThreadFactory] } catch { case NonFatal(e) => // --add-opens java.base/java.lang=ALL-UNNAMED - throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e) + throw new UnsupportedOperationException("Failed to create virtual thread factory", e) + } + + object CarrieThreadFactory extends ForkJoinPool.ForkJoinWorkerThreadFactory { + private val clazz = ClassLoader.getSystemClassLoader.loadClass("jdk.internal.misc.CarrierThread") + // TODO lookup.findClass is only available in Java 9 + private val constructor = clazz.getDeclaredConstructor(classOf[ForkJoinPool]) + override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { + constructor.newInstance(pool).asInstanceOf[ForkJoinWorkerThread] Review Comment: Use java 8 reflect for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org