Coder Social home page Coder Social logo

vertx-virtual-threads-incubator's Introduction

Vert.x Virtual Threads Incubator

Build Status

Incubator for virtual threads based prototypes.

Prerequisites

Projects

Usage

enable preview flag must be enabled

<build>
  <pluginManagement>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <release>19</release>
          <compilerArgs>--enable-preview</compilerArgs>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <configuration>
          <argLine>--enable-preview</argLine>
        </configuration>
      </plugin>
    </plugins>
  </pluginManagement>
</build>

snapshots are available at s01.oss.sonatype.org

  <repositories>
  <repository>
    <id>vertx-snapshots-repository</id>
    <name>Vert.x Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
    <releases>
      <enabled>false</enabled>
    </releases>
    <snapshots>
      <enabled>true</enabled>
    </snapshots>
  </repository>
</repositories>

vertx-virtual-threads-incubator's People

Contributors

augustnagro avatar franz1981 avatar pmlopes avatar vietj avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

vertx-virtual-threads-incubator's Issues

executeBlock / blockingHandler support with virtual threads

Describe the feature

Extend existing async/await support to executeBlocking / blockingHandler threads

Use cases

Performance uplift (in theory) for any existing executeBlocking / blockingHandler code implementation in microthreads

Contribution

Who should implement this feature ? are you volunteering for implementing this feature or
do you know that is able and willing implement this feature ?

I dunno who should, or is willing to implement this feature

When the form type is multipart/form data, Async.await becomes invalid , If the form submission is not multipart/form, everything is normal

Environment: jdk 19, vert.x 4.4.4

Request Content :

POST /test HTTP/1.1
Authorization: 14288617be8a4bfa805e2e24cbff324a
User-Agent: PostmanRuntime/7.32.3
Accept: /
Postman-Token: 0838b7ae-5f48-4f7e-adb1-40752b579440
Host: localhost:7001
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
Content-Type: multipart/form-data; boundary=--------------------------892028311293841493076041
Cookie: satoken=fd6c1f2f-b47b-4fb4-839e-348707ff9772
Content-Length: 602061

----------------------------892028311293841493076041
Content-Disposition: form-data; name="12323"; filename="238870.jpg"
<238870.jpg>
----------------------------892028311293841493076041--

exceptional:

java.lang.IllegalStateException
at io.vertx.await.impl.DefaultScheduler.unschedule(DefaultScheduler.java:77)
at io.vertx.await.impl.VirtualThreadContext.await(VirtualThreadContext.java:163)
at io.vertx.await.Async.await(Async.java:70)
at io.vertx.await.Async.await(Async.java:60)

Response response = await(VertxHolder.getVertxRedis().get(key));
if(null != response){
return response.toString();
}

Overall, it runs in async. run (v ->{})
If the form submission is not multipart/form, everything is normal

State of this incubator

I am aware that this is probably not the right place to ask this question, but I cannot ask questions in the Google group for some reason.

I would just like to get some information on what the state of this incubator is. Can we expect to see a production-ready version of this at some point in the (near) future?

For me, the support for virtual threads and a more synchronous way of writing code will probably decide if I will aim to use vert.x or not in future projects. This is why this incubator is so interesting and promising to me.

Thank you very much for your attention!

netty grpc errors out in ThreadLocal in a virtual thread context

Version

4.4.2 (latest)

Context

Suppose I create a netty grpc server on a virtual thread context. Observe the following error when the server is stopped:

java.lang.NullPointerException: Cannot invoke "java.util.List.remove(Object)" because the return value of "java.lang.ThreadLocal.get()" is null
	at io.vertx.grpc.VertxServer$ActualServer.lambda$stop$7(VertxServer.java:136)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
	at io.vertx.await.impl.VirtualThreadContext.lambda$run$1(VirtualThreadContext.java:102)
	at java.base/java.lang.VirtualThread.run(VirtualThread.java:305)
	at java.base/java.lang.VirtualThread$VThreadContinuation.lambda$new$0(VirtualThread.java:177)
	at java.base/jdk.internal.vm.Continuation.enter0(Continuation.java:327)
	at java.base/jdk.internal.vm.Continuation.enter(Continuation.java:320)

For context:

public class VertxServer extends Server {

  private static final ConcurrentMap<ServerID, ActualServer> map = new ConcurrentHashMap<>();

  private static class ActualServer {
...
    final ThreadLocal<List<ContextInternal>> contextLocal = new ThreadLocal<>();
...
    void stop(ContextInternal context, Promise<Void> promise) {
      boolean shutdown = count.decrementAndGet() == 0;
      context.runOnContext(v -> {
        group.removeWorker(context.nettyEventLoop());
        // ERROR LINE
        contextLocal.get().remove(context);
        if (shutdown) {
          map.remove(id);
          context.executeBlocking(p -> {
...

Not sure what the solution is here.

Do you have a reproducer?

(Investigating)

java.lang.NullPointerException in EventLoopScheduler

Version

947f181

Context

I observed the following while load testing a complex application. The core part of this load test runs many SQL transactions. These transactions are implemented as awaiting Vertx JOOQ reactive client executing a SQL query against a PgPool.

java.lang.NullPointerException: Cannot invoke "java.lang.Runnable.run()" because the return value of "java.util.LinkedList.poll()" is null
	at io.vertx.await.impl.EventLoopScheduler.lambda$new$1(EventLoopScheduler.java:75)
	at io.vertx.await.impl.EventLoopScheduler.lambda$new$0(EventLoopScheduler.java:60)
	at io.vertx.await.impl.EventLoopScheduler.lambda$new$2(EventLoopScheduler.java:74)
	at java.base/java.lang.VirtualThread.submitRunContinuation(VirtualThread.java:234)
	at java.base/java.lang.VirtualThread.submitRunContinuation(VirtualThread.java:253)
	at java.base/java.lang.VirtualThread.start(VirtualThread.java:462)
	at java.base/java.lang.VirtualThread.start(VirtualThread.java:475)
	at io.vertx.await.impl.EventLoopScheduler.execute(EventLoopScheduler.java:89)
	at io.vertx.await.impl.VirtualThreadContext.execute2(VirtualThreadContext.java:118)
	at io.vertx.await.impl.VirtualThreadContext.execute(VirtualThreadContext.java:67)
	at io.vertx.await.impl.VirtualThreadContext.execute(VirtualThreadContext.java:84)
	at io.vertx.core.impl.ContextBase.execute(ContextBase.java:225)
	at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:51)
	at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
	at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
	at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
	at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
	at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
	at io.vertx.sqlclient.impl.command.CommandResponse.fire(CommandResponse.java:46)
	at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:292)
	at io.vertx.pgclient.impl.PgSocketConnection.handleMessage(PgSocketConnection.java:97)
	at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init$0(SocketConnectionBase.java:105)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
	at io.vertx.core.impl.ContextBase.emit(ContextBase.java:239)
	at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:390)
	at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:157)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write$0(PgEncoder.java:98)
	at io.vertx.pgclient.impl.codec.PgCommandCodec.handleReadyForQuery(PgCommandCodec.java:139)
	at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:237)
	at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:96)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1589)

For your convenience, the context of this code is:

I observed this issue again in load testing, now within a gRPC stub implementation:

Dec 22, 2022 4:32:39 PM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable@3600a37f
java.lang.NullPointerException: Cannot invoke "java.lang.Runnable.run()" because the return value of "java.util.LinkedList.poll()" is null
	at io.vertx.await.impl.EventLoopScheduler.lambda$new$1(EventLoopScheduler.java:75)
	at io.vertx.await.impl.EventLoopScheduler.lambda$new$0(EventLoopScheduler.java:60)
	at io.vertx.await.impl.EventLoopScheduler.lambda$new$2(EventLoopScheduler.java:74)
	at java.base/java.lang.VirtualThread.submitRunContinuation(VirtualThread.java:234)
	at java.base/java.lang.VirtualThread.submitRunContinuation(VirtualThread.java:253)
	at java.base/java.lang.VirtualThread.start(VirtualThread.java:462)
	at java.base/java.lang.VirtualThread.start(VirtualThread.java:475)
	at io.vertx.await.impl.EventLoopScheduler.execute(EventLoopScheduler.java:89)
	at io.vertx.core.eventbus.impl.HandlerRegistration.receive(HandlerRegistration.java:46)
	at io.vertx.core.eventbus.impl.EventBusImpl.deliverMessageLocally(EventBusImpl.java:375)
	at io.vertx.core.eventbus.impl.EventBusImpl.sendLocally(EventBusImpl.java:341)
	at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPub(EventBusImpl.java:329)
	at io.vertx.core.eventbus.impl.OutboundDeliveryContext.execute(OutboundDeliveryContext.java:109)
	at io.vertx.core.eventbus.impl.DeliveryContextBase.next(DeliveryContextBase.java:72)
	at io.vertx.core.eventbus.impl.OutboundDeliveryContext.next(OutboundDeliveryContext.java:28)
	at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:422)
	at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:428)
	at io.vertx.core.eventbus.impl.EventBusImpl.publish(EventBusImpl.java:164)
// context: this is forwarding grpc messages to the event bus
	at com.hiddenswitch.framework.impl.ServerGameContext.clientToServer(ServerGameContext.java:281)
	at com.hiddenswitch.framework.impl.ServerGameContext.lambda$subscribeGame$3(ServerGameContext.java:271)
// end context
	at io.vertx.grpc.stub.StreamObserverReadStream.onNext(StreamObserverReadStream.java:37)
	at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
	at io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
	at io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
	at com.avast.grpc.jwt.server.DelayedServerCallListener.onMessage(DelayedServerCallListener.java:17)
	at io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
	at com.be_hase.grpc.micrometer.MicrometerServerCallListener.onMessage(MicrometerServerCallListener.java:22)
	at io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
	at io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
	at io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
	at io.opentracing.contrib.grpc.TracingServerInterceptor$2.onMessage(TracingServerInterceptor.java:221)
	at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:318)
	at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:301)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at io.vertx.grpc.VertxServer$ActualServer.lambda$null$0(VertxServer.java:96)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
	at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1589)

Do you have a reproducer?

(Investigating)

Steps to reproduce

(Investigating)

Extra

macOS 13, Java 19

Abstract virtual thread verticle draft implementation

Describe the feature

This is a starting point for implementing an abstract virtual thread verticle. It works very well for me in production.

Use cases

All handlers created in the verticle's start will themselves run within virtual threads, which is excellent! This makes interacting with blocking Java APIs seamless.

Contribution

Here's my code that I have been using:

package com.hiddenswitch.framework.virtual.concurrent;

import io.vertx.await.impl.EventLoopScheduler;
import io.vertx.await.impl.VirtualThreadContext;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;

public abstract class AbstractVirtualThreadVerticle extends AbstractVerticle {

	private Context startingContext;
	private Context stoppingContext;

	@Override
	public final void init(Vertx vertx1, Context context1) {
		super.init(vertx1, context1);

		var context = (ContextInternal) context1;
		var scheduler = new EventLoopScheduler(context.nettyEventLoop());
		var vertx = (VertxInternal) vertx1;
		this.context = new VirtualThreadContext(vertx, context.nettyEventLoop(), vertx.getInternalWorkerPool(), vertx.getWorkerPool(), scheduler, context.getDeployment(), context.closeFuture(), Thread.currentThread().getContextClassLoader());
	}

	@Override
	public final void start() throws Exception {
	}

	@Override
	public final void stop() throws Exception {
	}

	@Override
	public final void start(Promise<Void> startPromise) throws Exception {
		this.startingContext = Vertx.currentContext();
		context.runOnContext(v -> {
			try {
				startVirtual();
				startingContext.runOnContext(v2 -> startPromise.complete());
			} catch (Throwable t) {
				startingContext.runOnContext(v2 -> startPromise.fail(t));
			}
		});
	}

	@Override
	public final void stop(Promise<Void> stopPromise) throws Exception {
		this.stoppingContext = Vertx.currentContext();
		context.runOnContext(v -> {
			try {
				stopVirtual();
				stoppingContext.runOnContext(v2 -> stopPromise.complete());
			} catch (Throwable t) {
				stoppingContext.runOnContext(v2 -> stopPromise.fail(t));
			}
		});
	}

	public void startVirtual() throws Exception {
	}

	public void stopVirtual() throws Exception {
	}

	protected Context startingContext() {
		return startingContext;
	}

	protected Context stoppingContext() {
		return stoppingContext;
	}
}

users should override startVirtual and stopVirtual with await on Vertx Futures. That's it.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.