Coder Social home page Coder Social logo

fs2-grpc's Introduction

fs2-grpc - gRPC implementation for FS2/cats-effect

Discord CI Latest version Code of Consuct Scala Steward badge

SBT configuration

project/plugins.sbt:

addSbtPlugin("org.typelevel" % "sbt-fs2-grpc" % "<latest-version>")

build.sbt:

enablePlugins(Fs2Grpc)

Depending if you wish to use grpc-netty or grpc-okhttp, add one of the following dependencies:

libraryDependencies += "io.grpc" % "grpc-netty-shaded" % scalapb.compiler.Version.grpcJavaVersion

or

libraryDependencies += "io.grpc" % "grpc-okhttp" % scalapb.compiler.Version.grpcJavaVersion

Protocol buffer files

The protobuf files should be stored in the directory <project_root>/src/main/protobuf.

Multiple projects

If the generated code is used by multiple projects, you may build the client/server code in a common project which other projects depend on. For example:

lazy val protobuf =
  project
    .in(file("protobuf"))
    .enablePlugins(Fs2Grpc)

lazy val client =
  project
    .in(file("client"))
    .dependsOn(protobuf)

lazy val server =
  project
    .in(file("server"))
    .dependsOn(protobuf)

Creating a client

A ManagedChannel is the type used by grpc-java to manage a connection to a particular server. This library provides syntax for ManagedChannelBuilder which creates a Resource which can manage the shutdown of the channel, by calling .resource[F] where F has an instance of the Sync typeclass. This implementation will do a drain of the channel, and attempt to shut down the channel, forcefully closing after 30 seconds. An example of the syntax using grpc-netty is:

import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import fs2.grpc.syntax.all._

val managedChannelResource: Resource[IO, ManagedChannel] =
  NettyChannelBuilder
    .forAddress("127.0.0.1", 9999)
    .resource[IO]

The syntax also offers the method resourceWithShutdown which takes a function ManagedChannel => F[Unit] which is used to manage the shutdown. This may be used where requirements before shutdown do not match the default behaviour.

The generated code provides a method stubResource[F], for any F which has a Async instance, and it takes a parameter of type Channel. It returns a Resource with an implementation of the service (in a trait), which can be used to make calls.

Moreover, the generated code provides method overloads that take ClientOptions used for configuring calls.

def runProgram(stub: MyFs2Grpc[IO]): IO[Unit] = ???

val run: IO[Unit] = managedChannelResource
  .flatMap(ch => MyFs2Grpc.stubResource[IO](ch))
  .use(runProgram)

Creating a server

The generated code provides a method bindServiceResource[F], for any F which has a Async instance, and it takes an implementation of the service (in a trait), which is used to serve responses to RPC calls. It returns a Resource[F, ServerServiceDefinition] which is given to the server builder when setting up the service. Furthermore, the generated code provides method overloads that take ServerOptions used for configuring service calls.

A Server is the type used by grpc-java to manage the server connections and lifecycle. This library provides syntax for ServerBuilder, which mirrors the pattern for the client. An example is:

import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
import fs2.grpc.syntax.all._

val helloService: Resource[IO, ServerServiceDefinition] = 
  MyFs2Grpc.bindServiceResource[IO](new MyImpl())

def run(service: ServerServiceDefinition) = NettyServerBuilder
  .forPort(9999)
  .addService(service)
  .resource[IO]
  .evalMap(server => IO(server.start()))
  .useForever

helloService.use(run)

Code generation options

To alter code generation, you can set some flags with scalapbCodeGeneratorOptions, e.g.:

scalapbCodeGeneratorOptions += CodeGeneratorOption.FlatPackage

The full set of options available are:

  • CodeGeneratorOption.FlatPackage - If true, the compiler does not append the proto base file name
  • CodeGeneratorOption.JavaConversions - Enable Java conversions for protobuf
  • CodeGeneratorOption.Grpc (included by default) - generate grpc bindings based on Observables
  • CodeGeneratorOption.Fs2Grpc (included by default) - generate grpc bindings for FS2/cats
  • CodeGeneratorOption.SingleLineToProtoString - toProtoString generates single line
  • CodeGeneratorOption.AsciiFormatToString - toString uses toProtoString functionality

Pass additional protoc options

PB.protocOptions in Compile := Seq("-xyz")

Tool Sponsorship

Development of fs2-grpc is generously supported in part by YourKit through the use of their excellent Java profiler.

fs2-grpc's People

Contributors

ahjohannessen avatar armanbilge avatar diesalbla avatar fedefernandez avatar fiadliel avatar fredfp avatar frosforever avatar gitter-badger avatar gorzen avatar gurinderu avatar justcoon avatar kczulko avatar lacarvalho91 avatar larsrh avatar mackenziestarr avatar naoh87 avatar nikiforo avatar qi77qi avatar rossabaker avatar scala-steward avatar sideeffffect avatar sullis avatar talkingfoxmid avatar tapped avatar thesamet avatar timbertson avatar typelevel-steward[bot] avatar valdemargr avatar valencik avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fs2-grpc's Issues

Grpc status exceptions are rethrown

In our app we are experiencing exceptions in our (stderr) logs even though they are grpc status exceptions.

It seems that status exceptions are logged by the cats-effect runtime when thrown, since guarentee rethrows.

private def run[F[_]](completed: F[Unit], dispatcher: Dispatcher[F])(implicit F: Sync[F]): SyncIO[Cancel] = {
SyncIO {
val cancel = dispatcher.unsafeRunCancelable(F.guaranteeCase(completed) {
case Outcome.Succeeded(_) => close(Status.OK, new Metadata()).to[F]
case Outcome.Errored(e) => handleError(e).to[F]
case Outcome.Canceled() => close(Status.CANCELLED, new Metadata()).to[F]
})
SyncIO(cancel()).void
}
}

I'm not sure what the best course of action is for non-status exceptions, but for status exceptions I believe it doesn't make much sense to rethrow them.

We are using the following fix locally:

  private def run[F[_]](completed: F[Unit], dispatcher: Dispatcher[F])(implicit F: Sync[F]): SyncIO[Cancel] = {
    SyncIO {
      val cancel = dispatcher.unsafeRunCancelable(
        F.recover {
          F.guaranteeCase(completed) {
            case Outcome.Succeeded(_) => close(Status.OK, new Metadata()).to[F]
            case Outcome.Errored(e) => handleError(e).to[F]
            case Outcome.Canceled() => close(Status.CANCELLED, new Metadata()).to[F]
          }
        } { case _: StatusException | _: StatusRuntimeException =>
          ()
        }
      )
      SyncIO(cancel()).void
    }
  }

If possible, it would be nice to either make the handler for non-status exceptions configurable or not throw them at all, since the default exception reporter does not respect logging configuration.

Cannot generate via protoc binary

Hello,

I am struggling with the following issue:

  1. Create a dummy proto file:
$ cat message.proto
syntax = "proto3";

package com.book;

message Book {
    int64 isbn = 1;
    string title = 2;
    string author = 3;
}
  1. Download fs2-grpc plugin (or wrap the jar in a script to make it executable):
$ wget https://repo.maven.apache.org/maven2/org/typelevel/protoc-gen-fs2-grpc/2.7.14/protoc-gen-fs2-grpc-2.7.14-unix.sh  
$ chmod +x protoc-gen-fs2-grpc-2.7.14-unix.sh 
  1. Run protoc:
$ protoc --version
libprotoc 24.4
$ mkdir haha
$ protoc --plugin=protoc-gen-fs2_grpc_codegen=./protoc-gen-fs2-grpc-2.7.14-unix.sh --fs2_grpc_codegen_opt=grpc,flat_package --fs2_grpc_codegen_out=./haha message.proto
$ cd haha && ll
total 0

What am I doing wrong? :)

Thanks in advance,
Karol

No throttling of message sending

Neither client nor server support flow control when sending messages (they do request
messages, so will interoperate with other servers and clients). This is because they
do not check call.isReady, or set the onReady handler.

Without this, the code which generates messages will not be throttled, and the code may
OOM in some situations.

use sbt-ci-release

Would it be an idea to use sbt-ci-release such that we can autopublish releases via tags?

Run fs2-grpc service with a caller thread such as an eventloop thread

While testing fs2-grpc with http4s-armeria, I realized that a gRPC service does not run in an event loop(the caller thread) of Armeria.
It was executed with an ioapp-compute-* thread instead.

Running a service in an event loop is an important feature in Armeria because:

  • Thread is a context-aware thread. That means it propagates a context to other scopes for Zipkin and logging.
  • We can reduce the context switching cost by directly executing an asynchronous service in an event loop.
  • Users get ServiceRequestContext from the thread local which can accesses a structured log.

Is there a way to configure working threads?

Related discussion: grpc/grpc-kotlin#66

A closed client can still receive events

To be more precise, onClose can be called on the underlying Fs2StreamClientCallListener while the client (and its dispatcher) are already closed. I don't have a full end-to-end example, but it comes down to this:

val someServiceResource: Resource[IO, SomeServiceFs2Grpc[IO, Unit]] = ???

someServiceResource
  .map(
    _.someStream(SomeStreamRequest(), ()).interruptAfter(1.second).compile.drain
  )
  .useEval
  .unsafeRunSync()

Output:

Jun 07, 2023 1:52:23 PM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed@13affa0e
java.lang.IllegalStateException: dispatcher already shutdown
        at cats.effect.std.Dispatcher$$anon$2.unsafeToFutureCancelable(Dispatcher.scala:422)
        at cats.effect.std.DispatcherPlatform.unsafeRunTimed(DispatcherPlatform.scala:59)
        at cats.effect.std.DispatcherPlatform.unsafeRunTimed$(DispatcherPlatform.scala:58)
        at cats.effect.std.Dispatcher$$anon$2.unsafeRunTimed(Dispatcher.scala:317)
        at cats.effect.std.DispatcherPlatform.unsafeRunSync(DispatcherPlatform.scala:51)
        at cats.effect.std.DispatcherPlatform.unsafeRunSync$(DispatcherPlatform.scala:50)
        at cats.effect.std.Dispatcher$$anon$2.unsafeRunSync(Dispatcher.scala:317)
        at fs2.grpc.client.Fs2StreamClientCallListener.onClose(Fs2StreamClientCallListener.scala:43)
        at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:468)
        at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:432)
        at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:465)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

So interruptAfter preemptively stops the stream after which the surrounding resource is immediately closed. But onClose still gets called afterwards and invokes the already closed dispatcher.

specify location of protocol buffer files

Hello! It's reallly good to see project like this adapting grpc to cats-effect ecosystem. My daily codebase uses a lot of gRPC. My question is wrether is it possible to change default <project_root>/src/main/protobuf location? With sbt-protoc it was straightforward since I had to add option

PB.protoSources in Compile := Seq(
    ".." / "protobufs" / "service1",
    ".." / "protobufs" / "service2"
  )

Can I achieve this in fs2-grpc? If not, maybe it's good first issue to start contributing to project?

Let me know what you think

InternalError exception when switching threads inside server implementation

io.grpc.StatusRuntimeException: INTERNAL: Calling TypedActor.context outside of a TypedActor implementation method!
	at io.grpc.Status.asRuntimeException(Status.java:533)
	at org.lyranthe.fs2_grpc.java_runtime.client.Fs2UnaryClientCallListener.$anonfun$getValue$2(Fs2UnaryClientCallListener.scala:26)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:145)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:366)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:387)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:330)
	at cats.effect.internals.Callback$AsyncIdempotentCallback.run(Callback.scala:131)
	at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
	at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
	at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
	at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
	at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:137)
	at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:126)
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1(Deferred.scala:204)
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1$adapted(Deferred.scala:204)
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$notifyReadersLoop$1(Deferred.scala:239)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:366)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:387)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:330)
	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
	at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Process finished with exit code 1

It is caused by calling IO.fromFuture which I assume cross threads boundaries. Following code in server implementation causes this error:

  override def getAccount(request: AccountRef, ctx: Metadata): IO[GrpcAccount] = {
    IO.fromFuture(IO.delay(Future(Account(1,"123",None,"cd"))))
  }

changing it to

override def getAccount(request: AccountRef, ctx: Metadata): IO[GrpcAccount] = {
    IO.delay(Account(1,"123",None,"cd"))
  }

fixes the problem but that's not an option for us.

Enabling JavaConversions together with FlatPackage breaks compilation

See title:

This works:

scalapbCodeGeneratorOptions ++= Seq(FlatPackage)

This doesn't:

scalapbCodeGeneratorOptions ++= Seq(FlatPackage , JavaConversions)

Note: I'm using features such as oneof, so it might be a special case. Let me know if you need more information to reproduce this. I'll also look into it when I have a bit more spare time.

Should Metadata be provided via Kleisli?

Currently, trait looks like

trait HelloService[F[_]] {
  def sayHello(request: HelloRequest, metadata: Metadata): F[HelloResponse]
}

should there instead be something like this?

trait HelloService[F[_]] {
  def sayHello(request: HelloRequest): F[HelloResponse]
}

trait MetadataHelloService[F[_]] extends HelloService[Kleisli[F, Metadata, ?]] {
  def sayHello(request: HelloRequest): Kleisli[F, MetadataService, HelloResponse]
}

Support code gen using ? for wildcards

In Scala 3 it is possible to use _ for type lambda placeholders by setting the -Ykind-projector:underscores compiler flag. When this is enabled _ can no longer be used for wildcards, ? must be used instead. The code gen doesn't support this ATM so you end up with compiler errors from the generated code.

Scalajs/grpc-web support?

Is there any way to support scalajs so the client can be used in a browser context? Also is there a possibility of supporting grpc-web requests in the server without having to use a proxy?

Context was not attached when detaching

There is a bug in 0.5.1 that cause an exception to be thrown whenever the rpc impl. is doing context shifting. First reported on github by @ahjohannessen, see https://github.com/fiadliel/fs2-grpc/pull/86/files#r325043454.

The error is Context was not attached when detaching. We are now assuming that we are on the same thread context when releasing the gRPC context, see https://github.com/cognitedata/fs2-grpc/blob/25456d44e84fc3e14a9961dc88d6704584724918/java-runtime/src/main/scala/server/Fs2ServerCallListener.scala#L48

Proposed solution is to make sure that we only release the context on the right thread context by shifting the release effect of the resource to the same thread as acquire. I'm not sure of a good way of doing that, so alternatively we may leak the context binding, which in practice is not an issue I believe, if we assume that no else are binding gRPC context to the thread pool used by fs2-grpc.

Add context propagation?

grpc-java includes opencensus integration, which allows tracing of requests across a call graph. While this would be nice to have, I don't think the context is propagated across asynchronous call boundaries (it's in a ThreadLocalStorage).

Should investigate possible ways of making this context available.

Support generating code from proto in src/test/protobuf

We have proto files in src/test/protobuf which doesn't seem to be supported out of the box. To get it to work I've had to copy paste some settings from

override def projectSettings: Seq[Def.Setting[_]] =
.

inConfig(Test)(
  Seq(
    PB.targets := Fs2GrpcPlugin.autoImport.scalapbCodeGenerators.value,
    PB.protoSources := Seq(sourceDirectory.value / "protobuf"),
    fs2GrpcServiceSuffix := "Fs2Grpc",
    fs2GrpcOutputPath := sourceManaged.value / "fs2-grpc",
    scalapbCodeGeneratorOptions := Seq(CodeGeneratorOption.Grpc, CodeGeneratorOption.Fs2Grpc),
    scalapbCodeGenerators := {
      val options = scalapbCodeGeneratorOptions.value.toSet
      Target(
        scalapb.gen(
          flatPackage = options(CodeGeneratorOption.FlatPackage),
          javaConversions = options(CodeGeneratorOption.JavaConversions),
          grpc = options(CodeGeneratorOption.Grpc),
          singleLineToProtoString = options(CodeGeneratorOption.SingleLineToProtoString),
          asciiFormatToString = options(CodeGeneratorOption.AsciiFormatToString)
        ),
        sourceManaged.value / "scalapb"
      ) ::
        Option(
          Target(
            (
              SandboxedJvmGenerator.forModule(
                "scala-fs2-grpc",
                Artifact(
                  BuildInfo.organization,
                  s"${BuildInfo.codeGeneratorName}_${CrossVersion.binaryScalaVersion(BuildInfo.scalaVersion)}",
                  BuildInfo.version
                ),
                BuildInfo.codeGeneratorFullName + "$",
                Nil
              ),
              scalapbCodeGeneratorOptions.value.filterNot(_ == CodeGeneratorOption.Fs2Grpc).map(_.toString) :+
                s"serviceSuffix=${fs2GrpcServiceSuffix.value}"
            ),
            fs2GrpcOutputPath.value
          )
        ).filter(_ => scalapbCodeGeneratorOptions.value.contains(CodeGeneratorOption.Fs2Grpc)).toList
    }
  )
)

It would be good if we supported this out of the box like Akka gRPC does here.

MiMa failing

Due to #477, sorry! I'm a little confused how it passed CI but not on main.

[error] (codegen / mimaPreviousClassfiles) sbt.librarymanagement.ResolveException: Error downloading org.typelevel:fs2-grpc-codegen_2.12:2.4.3

Why does it publish a double artifact

Calling +publishSigned results in this. Figure out why.

[error] (java-runtime / publishSigned) java.io.IOException: PUT operation failed because the destination file exists and overwriting is disabled:
[error]     source     : /home/ross/src/fs2-grpc/java-runtime/target/scala-2.12/java-runtime_2.12-0.4.0-M5.pom
[error]     destination: https://oss.sonatype.org/service/local/staging/deploy/maven2/org/lyranthe/fs2-grpc/java-runtime_2.12/0.4.0-M5/java-runtime_2.12-0.4.0-M5.pom

Both the _2.11 and _2.12 artifacts are published before this happens.

Since v2.5.0 `fs2-grpc-runtime_2.13-2.5.0.jar` is actually a Scala 3 artifact ...

$ wget https://repo1.maven.org/maven2/org/typelevel/fs2-grpc-runtime_2.13/2.5.0/fs2-grpc-runtime_2.13-2.5.0.jar
$ less fs2-grpc-runtime_2.13-2.5.0.jar
Archive:  fs2-grpc-runtime_2.13-2.5.0.jar
 Length   Method    Size  Cmpr    Date    Time   CRC-32   Name
--------  ------  ------- ---- ---------- ----- --------  ----
     378  Defl:N      172  55% 2010-01-01 00:00 94f392f2  META-INF/MANIFEST.MF
       0  Stored        0   0% 2010-01-01 00:00 00000000  fs2/
       0  Stored        0   0% 2010-01-01 00:00 00000000  fs2/grpc/
       0  Stored        0   0% 2010-01-01 00:00 00000000  fs2/grpc/client/
       0  Stored        0   0% 2010-01-01 00:00 00000000  fs2/grpc/client/internal/
       0  Stored        0   0% 2010-01-01 00:00 00000000  fs2/grpc/server/
       0  Stored        0   0% 2010-01-01 00:00 00000000  fs2/grpc/server/internal/
       0  Stored        0   0% 2010-01-01 00:00 00000000  fs2/grpc/shared/
       0  Stored        0   0% 2010-01-01 00:00 00000000  fs2/grpc/syntax/
   26158  Defl:N     8521  67% 2010-01-01 00:00 bdb1e81a  fs2/grpc/GeneratedCompanion.class
    9703  Defl:N     4710  52% 2010-01-01 00:00 9291ba4a  fs2/grpc/GeneratedCompanion.tasty
     868  Defl:N      443  49% 2010-01-01 00:00 88982cd1  fs2/grpc/client/ClientOptions$$anon$1.class
    1223  Defl:N      633  48% 2010-01-01 00:00 4d611eca  fs2/grpc/client/ClientOptions$$anon$2$$anonfun$$lessinit$greater$1.class
     756  Defl:N      422  44% 2010-01-01 00:00 a706632c  fs2/grpc/client/ClientOptions$$anon$2.class
     699  Defl:N      399  43% 2010-01-01 00:00 6a90e8c5  fs2/grpc/client/ClientOptions$.class
    4516  Defl:N     2139  53% 2010-01-01 00:00 bac4f99e  fs2/grpc/client/ClientOptions.class
    2597  Defl:N     1955  25% 2010-01-01 00:00 39629ff1  fs2/grpc/client/ClientOptions.tasty
    2987  Defl:N     1278  57% 2010-01-01 00:00 f926f28a  fs2/grpc/client/Fs2ClientCall$$anon$1.class
    3309  Defl:N     1420  57% 2010-01-01 00:00 d47be9cb  fs2/grpc/client/Fs2ClientCall$$anonfun$1.class
     840  Defl:N      463  45% 2010-01-01 00:00 ce33fdf0  fs2/grpc/client/Fs2ClientCall$.class
    4141  Defl:N     1637  61% 2010-01-01 00:00 45d49021  fs2/grpc/client/Fs2ClientCall$PartiallyAppliedClientCall$.class
    2168  Defl:N      873  60% 2010-01-01 00:00 053a0c2c  fs2/grpc/client/Fs2ClientCall$PartiallyAppliedClientCall.class
   21359  Defl:N     9151  57% 2010-01-01 00:00 523674d6  fs2/grpc/client/Fs2ClientCall.class
    9457  Defl:N     6620  30% 2010-01-01 00:00 7bd035c5  fs2/grpc/client/Fs2ClientCall.tasty
    3097  Defl:N     1246  60% 2010-01-01 00:00 06e94f6b  fs2/grpc/client/Fs2StreamClientCallListener$.class
    4587  Defl:N     2510  45% 2010-01-01 00:00 909972ad  fs2/grpc/client/Fs2StreamClientCallListener.class
    2832  Defl:N     2195  23% 2010-01-01 00:00 8b43260a  fs2/grpc/client/Fs2StreamClientCallListener.tasty
    2077  Defl:N      939  55% 2010-01-01 00:00 173a4d4f  fs2/grpc/client/GrpcStatus$.class
    5394  Defl:N     2812  48% 2010-01-01 00:00 89331d27  fs2/grpc/client/GrpcStatus.class
    2555  Defl:N     2002  22% 2010-01-01 00:00 a79aec69  fs2/grpc/client/GrpcStatus.tasty
    6710  Defl:N     2915  57% 2010-01-01 00:00 10c5a9bc  fs2/grpc/client/StreamIngest$$anon$1.class
    3312  Defl:N     1304  61% 2010-01-01 00:00 4a412ea8  fs2/grpc/client/StreamIngest$.class
    2801  Defl:N     1633  42% 2010-01-01 00:00 82b56a98  fs2/grpc/client/StreamIngest.class
    4553  Defl:N     3284  28% 2010-01-01 00:00 3986b59d  fs2/grpc/client/StreamIngest.tasty
    1814  Defl:N      843  54% 2010-01-01 00:00 e7aec1b7  fs2/grpc/client/UnaryResult$.class
    5747  Defl:N     2872  50% 2010-01-01 00:00 1b0c0dd2  fs2/grpc/client/UnaryResult.class
    2753  Defl:N     2190  21% 2010-01-01 00:00 3c8221e5  fs2/grpc/client/UnaryResult.tasty
    6401  Defl:N     2318  64% 2010-01-01 00:00 77017245  fs2/grpc/client/internal/Fs2UnaryCallHandler$$anon$1.class
   12712  Defl:N     4410  65% 2010-01-01 00:00 f7674de7  fs2/grpc/client/internal/Fs2UnaryCallHandler$.class
     693  Defl:N      357  49% 2010-01-01 00:00 dec7bd61  fs2/grpc/client/internal/Fs2UnaryCallHandler$Done.class
    3764  Defl:N     1387  63% 2010-01-01 00:00 f304d53d  fs2/grpc/client/internal/Fs2UnaryCallHandler$PendingHalfClose.class
    3443  Defl:N     1289  63% 2010-01-01 00:00 5099df1c  fs2/grpc/client/internal/Fs2UnaryCallHandler$PendingMessage.class
    4314  Defl:N     1813  58% 2010-01-01 00:00 401a9b84  fs2/grpc/client/internal/Fs2UnaryCallHandler$ReceiveState$.class
     359  Defl:N      224  38% 2010-01-01 00:00 9e067ae4  fs2/grpc/client/internal/Fs2UnaryCallHandler$ReceiveState.class
    4818  Defl:N     2855  41% 2010-01-01 00:00 2c74d8e1  fs2/grpc/client/internal/Fs2UnaryCallHandler.class
   10777  Defl:N     6996  35% 2010-01-01 00:00 a4124994  fs2/grpc/client/internal/Fs2UnaryCallHandler.tasty
    8422  Defl:N     2814  67% 2010-01-01 00:00 5d28ae79  fs2/grpc/server/Fs2ServerCall$.class
    7238  Defl:N     3212  56% 2010-01-01 00:00 19f073ac  fs2/grpc/server/Fs2ServerCall.class
    2990  Defl:N     2189  27% 2010-01-01 00:00 f62f6498  fs2/grpc/server/Fs2ServerCall.tasty
    3988  Defl:N     1545  61% 2010-01-01 00:00 e6999ba5  fs2/grpc/server/Fs2ServerCallHandler$$anon$1.class
    5945  Defl:N     2272  62% 2010-01-01 00:00 4fcf6b7d  fs2/grpc/server/Fs2ServerCallHandler$$anon$2.class
    1200  Defl:N      552  54% 2010-01-01 00:00 0cfb8005  fs2/grpc/server/Fs2ServerCallHandler$.class
    5014  Defl:N     2247  55% 2010-01-01 00:00 b4af0386  fs2/grpc/server/Fs2ServerCallHandler.class
    4876  Defl:N     3356  31% 2010-01-01 00:00 dbd3bf76  fs2/grpc/server/Fs2ServerCallHandler.tasty
   11045  Defl:N     4844  56% 2010-01-01 00:00 413cc082  fs2/grpc/server/Fs2ServerCallListener.class
    5443  Defl:N     3717  32% 2010-01-01 00:00 63219539  fs2/grpc/server/Fs2ServerCallListener.tasty
     948  Defl:N      471  50% 2010-01-01 00:00 cc9f4733  fs2/grpc/server/Fs2StreamServerCallListener$.class
    7887  Defl:N     2689  66% 2010-01-01 00:00 f01e0e65  fs2/grpc/server/Fs2StreamServerCallListener$PartialFs2StreamServerCallListener$.class
    2305  Defl:N      898  61% 2010-01-01 00:00 a688881e  fs2/grpc/server/Fs2StreamServerCallListener$PartialFs2StreamServerCallListener.class
    8370  Defl:N     4311  49% 2010-01-01 00:00 077e0e45  fs2/grpc/server/Fs2StreamServerCallListener.class
    4960  Defl:N     3673  26% 2010-01-01 00:00 6348004c  fs2/grpc/server/Fs2StreamServerCallListener.tasty
    1743  Defl:N      835  52% 2010-01-01 00:00 430e63f7  fs2/grpc/server/GzipCompressor$.class
    2027  Defl:N     1241  39% 2010-01-01 00:00 6e6f1280  fs2/grpc/server/GzipCompressor.class
    1178  Defl:N      988  16% 2010-01-01 00:00 d72699fe  fs2/grpc/server/GzipCompressor.tasty
     762  Defl:N      407  47% 2010-01-01 00:00 b246a11c  fs2/grpc/server/ServerCallOptions$$anon$1.class
     578  Defl:N      346  40% 2010-01-01 00:00 1a655c75  fs2/grpc/server/ServerCallOptions$$anon$2.class
     723  Defl:N      408  44% 2010-01-01 00:00 a88ed152  fs2/grpc/server/ServerCallOptions$.class
    3080  Defl:N     1592  48% 2010-01-01 00:00 17b93fe3  fs2/grpc/server/ServerCallOptions.class
    1914  Defl:N     1464  24% 2010-01-01 00:00 0b7e3aa6  fs2/grpc/server/ServerCallOptions.tasty
    1905  Defl:N     1017  47% 2010-01-01 00:00 193d76a7  fs2/grpc/server/ServerCompressor.class
     605  Defl:N      557   8% 2010-01-01 00:00 a6c5ffdd  fs2/grpc/server/ServerCompressor.tasty
     704  Defl:N      374  47% 2010-01-01 00:00 a4dd9019  fs2/grpc/server/ServerOptions$$anon$1.class
    1307  Defl:N      637  51% 2010-01-01 00:00 0ebeb11c  fs2/grpc/server/ServerOptions$$anon$2$$anonfun$$lessinit$greater$1.class
     596  Defl:N      345  42% 2010-01-01 00:00 4ce7554b  fs2/grpc/server/ServerOptions$$anon$2.class
     699  Defl:N      399  43% 2010-01-01 00:00 769e3d24  fs2/grpc/server/ServerOptions$.class
    2174  Defl:N     1092  50% 2010-01-01 00:00 407a5aae  fs2/grpc/server/ServerOptions.class
    1390  Defl:N     1130  19% 2010-01-01 00:00 72d617b1  fs2/grpc/server/ServerOptions.tasty
    3643  Defl:N     1438  61% 2010-01-01 00:00 6af038df  fs2/grpc/server/internal/Fs2ServerCall$.class
   12928  Defl:N     5715  56% 2010-01-01 00:00 b19c28ea  fs2/grpc/server/internal/Fs2ServerCall.class
    6258  Defl:N     4441  29% 2010-01-01 00:00 8abd7c48  fs2/grpc/server/internal/Fs2ServerCall.tasty
    6365  Defl:N     2340  63% 2010-01-01 00:00 d346044e  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$$anon$1.class
    4729  Defl:N     1689  64% 2010-01-01 00:00 7f124930  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$$anon$2.class
    5582  Defl:N     1964  65% 2010-01-01 00:00 73853496  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$$anon$3.class
    6901  Defl:N     2026  71% 2010-01-01 00:00 037543c2  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$.class
    2000  Defl:N      844  58% 2010-01-01 00:00 5834be38  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$Called$.class
    3615  Defl:N     1360  62% 2010-01-01 00:00 1983929d  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$Called.class
    2151  Defl:N      911  58% 2010-01-01 00:00 c490c947  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$CallerState$.class
     375  Defl:N      225  40% 2010-01-01 00:00 f13d191e  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$CallerState.class
    1663  Defl:N      711  57% 2010-01-01 00:00 8923a4ec  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$Cancelled$.class
    2783  Defl:N     1083  61% 2010-01-01 00:00 3b337394  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$Cancelled.class
    2374  Defl:N      947  60% 2010-01-01 00:00 061e8992  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$PendingHalfClose$.class
    5785  Defl:N     2040  65% 2010-01-01 00:00 282dd7da  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$PendingHalfClose.class
    2158  Defl:N      874  60% 2010-01-01 00:00 521c4d1c  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$PendingMessage$.class
    4304  Defl:N     1507  65% 2010-01-01 00:00 c69995a3  fs2/grpc/server/internal/Fs2UnaryServerCallHandler$PendingMessage.class
    8176  Defl:N     4915  40% 2010-01-01 00:00 407a535a  fs2/grpc/server/internal/Fs2UnaryServerCallHandler.class
   12975  Defl:N     8194  37% 2010-01-01 00:00 9b8343d3  fs2/grpc/server/internal/Fs2UnaryServerCallHandler.tasty
    5684  Defl:N     2027  64% 2010-01-01 00:00 906897ed  fs2/grpc/shared/StreamOutput$.class
    4410  Defl:N     2147  51% 2010-01-01 00:00 8c593f70  fs2/grpc/shared/StreamOutput.class
    3133  Defl:N     2302  27% 2010-01-01 00:00 d7d50a47  fs2/grpc/shared/StreamOutput.tasty
    7310  Defl:N     3312  55% 2010-01-01 00:00 196cacbe  fs2/grpc/shared/StreamOutputImpl.class
    2513  Defl:N     2031  19% 2010-01-01 00:00 c8d4e352  fs2/grpc/shared/StreamOutputImpl.tasty
     529  Defl:N      419  21% 2010-01-01 00:00 d42c7380  fs2/grpc/syntax/AllSyntax.class
     446  Defl:N      407   9% 2010-01-01 00:00 15fe2452  fs2/grpc/syntax/AllSyntax.tasty
    6146  Defl:N     2235  64% 2010-01-01 00:00 1f417534  fs2/grpc/syntax/ManagedChannelBuilderOps$.class
    6710  Defl:N     2962  56% 2010-01-01 00:00 e0e30bb4  fs2/grpc/syntax/ManagedChannelBuilderOps.class
    4489  Defl:N     2621  42% 2010-01-01 00:00 5a65c9c9  fs2/grpc/syntax/ManagedChannelBuilderOps.tasty
    1488  Defl:N      821  45% 2010-01-01 00:00 8ffcd525  fs2/grpc/syntax/ManagedChannelBuilderSyntax.class
     913  Defl:N      727  20% 2010-01-01 00:00 29cf613e  fs2/grpc/syntax/ManagedChannelBuilderSyntax.tasty
    5840  Defl:N     2223  62% 2010-01-01 00:00 be92400d  fs2/grpc/syntax/ServerBuilderOps$.class
    6385  Defl:N     2898  55% 2010-01-01 00:00 e8d8e03d  fs2/grpc/syntax/ServerBuilderOps.class
    4355  Defl:N     2586  41% 2010-01-01 00:00 5dbbee38  fs2/grpc/syntax/ServerBuilderOps.tasty
    1343  Defl:N      778  42% 2010-01-01 00:00 7295a4b1  fs2/grpc/syntax/ServerBuilderSyntax.class
     858  Defl:N      709  17% 2010-01-01 00:00 71266bf0  fs2/grpc/syntax/ServerBuilderSyntax.tasty
     421  Defl:N      297  30% 2010-01-01 00:00 e3cdcf88  fs2/grpc/syntax/package$.class
    1625  Defl:N      658  60% 2010-01-01 00:00 5081fafb  fs2/grpc/syntax/package$all$.class
    1171  Defl:N      541  54% 2010-01-01 00:00 37e7af5d  fs2/grpc/syntax/package$managedChannelBuilder$.class
    1055  Defl:N      530  50% 2010-01-01 00:00 6da1b50c  fs2/grpc/syntax/package$serverBuilder$.class
     902  Defl:N      641  29% 2010-01-01 00:00 c36bab10  fs2/grpc/syntax/package.class
    1070  Defl:N      855  20% 2010-01-01 00:00 3e0cc8a0  fs2/grpc/syntax/package.tasty
--------          -------  ---                            -------
  452249           218803  52%                            119 files

Code ends up generated into wrong directory

Screenshot from 2020-09-15 14-56-55

As you can see on the screenshot GreeterFs2Grpc is located under /fs2-grpc/examples/addressbook while it has a different package:

package examples.addressbook

import _root_.cats.implicits._

trait GreeterFs2Grpc[F[_], A] {
  def greet(request: examples.addressbook.Request, ctx: A): F[examples.addressbook.Response]
  def points(request: examples.addressbook.Request, ctx: A): _root_.fs2.Stream[F, examples.addressbook.Point]
  def bidi(request: _root_.fs2.Stream[F, examples.addressbook.Point], ctx: A): _root_.fs2.Stream[F, examples.addressbook.Response]
}

This prevents project from compiling.
scalaVersion = "2.12.10"
sbt = 1.3.8

Make Fs2CodeGenerator available in artifact

I'm using ScalaPB in an environment with no SBT (we're using Pants) using ScalaPBC instead of the SBT plugin to generate the proto implementation.

ScalaPBC has the option of specifying a custom code generator (instead of the default ScalaPbCodeGenerator). Would it be possible to publish the FS2CodeGenerator in an maven artifact? The org.lyranthe.fs2-grpc.java-runtime package does not seem to have this artifact

Running as standalone or with `mill`

Hi, I'm moving a project from sbt to mill and did not find a way to use the codegen outside of sbt. Is it possible to run the fs2-grpc codegen as a standalone, e.g. with protoc?

'2.7.10' release introduces SemVer incompatible dependency version changes

When we update our project to sbt-fs2-grpc:2.7.10 and use it with sbt-version-policy, we suddenly get incompatible version change errors:

[error] Incompatibilities with dependencies of grpc:1.1.0
[error]   com.google.guava:guava: incompatible version change from 31.0.1-android to 32.0.1-jre (compatibility: early semantic versioning)
[error]   com.google.j2objc:j2objc-annotations: incompatible version change from 1.3 to 2.8 (compatibility: early semantic versioning)
[error]   io.perfmark:perfmark-api: incompatible version change from 0.25.0 to 0.26.0 (compatibility: early semantic versioning)
[error]   io.grpc:grpc-context: missing dependency
[error]   org.checkerframework:checker-compat-qual: missing dependency

While manageable, it is unfortunate that this error happens during patch version update of a SBT plugin.

Aspect oriented programming as middleware

Grpc has support for interceptors on both client and server.
Having such tools in this library could facilitate improvements in usage (for instance tracing and authorization).

The ideas proposed here would also cover other similar issues (#627, #6, #567).

I have taken inspiration from cats-tagless for the Dom and Cod typeclasses.
https://github.com/typelevel/cats-tagless/blob/master/core/src/main/scala/cats/tagless/aop/Aspect.scala

The following example covers the server-side of an implementation, but a client aspect is very similar.

If you are interested I could put together a PR for this?

runtime module

import cats._
import cats.implicits._

// shared between client and server
case class CallContext[Req, Res](
  metadata: Metadata,
  methodDescriptor: io.grpc.MethodDescriptor[Req, Res]
)
trait ServiceAspect[F[_], Dom[_], Cod[_], A] { self =>
  def visitUnaryToUnary[Req, Res](
    callCtx: CallContext[Req, Res],
    dom: Dom[Req],
    cod: Cod[Res],
    request: (Req, A) => F[Res]
  ): Req => F[Res]

  def visitUnaryToStreaming[Req, Res](
    callCtx: CallContext[Req, Res],
    dom: Dom[Req],
    cod: Cod[Res],
    request: (Req, A) => fs2.Stream[F, Res]
  ): Req => fs2.Stream[F, Res]

  def visitStreamingToUnary[Req, Res](
    callCtx: CallContext[Req, Res],
    dom: Dom[Req],
    cod: Cod[Res],
    request: (fs2.Stream[F, Req], A) => F[Res]
  ): fs2.Stream[F, Req] => F[Res]

  def visitStreamingToStreaming[Req, Res](
    callCtx: CallContext[Req, Res],
    dom: Dom[Req],
    cod: Cod[Res],
    request: (fs2.Stream[F, Req], A) => fs2.Stream[F, Res]
  ): fs2.Stream[F, Req] => fs2.Stream[F, Res]

  def modify[B](f: A => F[B])(implicit F: Monad[F]): ServiceAspect[F, Dom, Cod, B] =
    new ServiceAspect[F, Dom, Cod, B] {
      override def visitUnaryToUnary[Req, Res](
        callCtx: CallContext[Req, Res],
        dom: Dom[Req],
        cod: Cod[Res],
        request: (Req, B) => F[Res]
      ): Req => F[Res] =
        self.visitUnaryToUnary[Req, Res](
          callCtx,
          dom,
          cod,
          (req, a) => f(a).flatMap(request(req, _))
        )

      override def visitUnaryToStreaming[Req, Res](
        callCtx: CallContext[Req, Res],
        dom: Dom[Req],
        cod: Cod[Res],
        request: (Req, B) => Stream[F, Res]
      ): Req => Stream[F, Res] =
        self.visitUnaryToStreaming[Req, Res](
          callCtx,
          dom,
          cod,
          (req, a) => fs2.Stream.eval(f(a)).flatMap(request(req, _))
        )

      override def visitStreamingToUnary[Req, Res](
        callCtx: CallContext[Req, Res],
        dom: Dom[Req],
        cod: Cod[Res],
        request: (Stream[F, Req], B) => F[Res]
      ): Stream[F, Req] => F[Res] =
        self.visitStreamingToUnary[Req, Res](
          callCtx,
          dom,
          cod,
          (req, a) => f(a).flatMap(request(req, _))
        )

      override def visitStreamingToStreaming[Req, Res](
        callCtx: CallContext[Req, Res],
        dom: Dom[Req],
        cod: Cod[Res],
        request: (Stream[F, Req], B) => Stream[F, Res]
      ): Stream[F, Req] => Stream[F, Res] =
        self.visitStreamingToStreaming[Req, Res](
          callCtx,
          dom,
          cod,
          (req, a) => fs2.Stream.eval(f(a)).flatMap(request(req, _))
        )
    }
}

// https://github.com/typelevel/cats-tagless/blob/master/core/src/main/scala/cats/tagless/Trivial.scala
final class Trivial[A] private extends Serializable

object Trivial {
  private val any                      = new Trivial[Any]
  implicit def instance[A]: Trivial[A] = any.asInstanceOf[Trivial[A]]
}

object ServiceAspect {
  def default[F[_], Dom[_], Cod[_]] = new ServiceAspect[F, Dom, Cod, io.grpc.Metadata] {
    override def visitUnaryToUnary[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Req, Metadata) => F[Res]
    ): Req => F[Res] = request(_, callCtx.metadata)

    override def visitUnaryToStreaming[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Req, Metadata) => Stream[F, Res]
    ): Req => Stream[F, Res] = request(_, callCtx.metadata)

    override def visitStreamingToUnary[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Stream[F, Req], Metadata) => F[Res]
    ): Stream[F, Req] => F[Res] = request(_, callCtx.metadata)

    override def visitStreamingToStreaming[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Stream[F, Req], Metadata) => Stream[F, Res]
    ): Stream[F, Req] => Stream[F, Res] = request(_, callCtx.metadata)
  }
}

codegen

case class Request()
case class Response()
case class StreamingResponse()

trait SomeServiceFs2Grpc[F[_], A] {
  def doSomething(request: Request, ctx: A): F[Response]
  def doSomethingStream(request: Request, ctx: A): fs2.Stream[F, StreamingResponse]
}
object SomeServiceFs2Grpc {
  def serviceBinding[F[_]: Async, Dom[_], Cod[_], A](
    dispatcher: Dispatcher[F],
    impl: SomeServiceFs2Grpc[F, A],
    serverOptions: ServerOptions,
    aspect: ServiceAspect[F, Dom, Cod, A]
  )(
    implicit domRequest: Dom[Request],
    codResponse: Cod[Response],
    codStreamingResponse: Cod[StreamingResponse]
  ) =
    io.grpc.ServerServiceDefinition
      .builder(SomeService.SERVICE)
      .addMethod(
        SomeService.METHOD_DO_SOMETHING,
        fs2.grpc.server
          .Fs2ServerCallHandler[F](dispatcher, serverOptions)
          .unaryToUnaryCall[Request, Response] { (r, m) =>
            aspect
              .visitUnaryToUnary[Request, Response](
                CallContext(m, SomeService.METHOD_DO_SOMETHING),
                domRequest,
                codResponse,
                (r, a) => impl.doSomething(r, a)
              )
              .apply(r)
          }
      )
      .addMethod(
        SomeService.METHOD_DO_SOMETHING_STREAM,
        fs2.grpc.server
          .Fs2ServerCallHandler[F](dispatcher, serverOptions)
          .unaryToStreamingCall[Request, StreamingResponse] { (r, m) =>
            aspect
              .visitUnaryToStreaming[Request, StreamingResponse](
                CallContext(m, SomeService.METHOD_DO_SOMETHING_STREAM),
                domRequest,
                codStreamingResponse,
                (r, a) => impl.doSomethingStream(r, a)
              )
              .apply(r)
          }
      )
      .build()
}

object SomeService {
  val METHOD_DO_SOMETHING: io.grpc.MethodDescriptor[Request, Response] = ???

  val METHOD_DO_SOMETHING_STREAM: io.grpc.MethodDescriptor[Request, StreamingResponse] = ???

  val SERVICE: io.grpc.ServiceDescriptor = ???
}

userland

val mySimpleImpl: SomeServiceFs2Grpc[IO, Metadata] = ???

val disp: Dispatcher[IO] = ???
SomeServiceFs2Grpc.serviceBinding(
  disp,
  mySimpleImpl,
  ServerOptions.default,
  ServiceAspect.default[IO, Trivial, Trivial]
)

// more complex usecase
type Auth = String
val myAuthedImpl: SomeServiceFs2Grpc[IO, Auth]     = ???
def extractAuthFromMetadata(m: Metadata): IO[Auth] = ???

SomeServiceFs2Grpc.serviceBinding(
  disp,
  myAuthedImpl,
  ServerOptions.default,
  ServiceAspect.default[IO, Trivial, Trivial].modify(extractAuthFromMetadata)
)

import natchez._
trait TracingAspect[A] {
  def key: String
  def value: String
}

def traceAspect[F[_]: Applicative: Trace, Cod[_], A](underlying: ServiceAspect[F, TracingAspect, Cod, A]) = {
  type Dom[A] = TracingAspect[A]
  new ServiceAspect[F, Dom, Cod, A] {
    override def visitUnaryToUnary[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Req, A) => F[Res]
    ): Req => F[Res] = req =>
      Trace[F].span(callCtx.methodDescriptor.getFullMethodName()) {
        Trace[F].put(dom.key -> dom.value) *>
          underlying.visitUnaryToUnary[Req, Res](
            callCtx,
            dom,
            cod,
            (req, a) => request(req, a)
          )(req)
      }

    override def visitUnaryToStreaming[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Req, A) => Stream[F, Res]
    ): Req => Stream[F, Res] = ???

    override def visitStreamingToUnary[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Stream[F, Req], A) => F[Res]
    ): Stream[F, Req] => F[Res] = ???

    override def visitStreamingToStreaming[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Stream[F, Req], A) => Stream[F, Res]
    ): Stream[F, Req] => Stream[F, Res] = ???

  }
}

Improve API for reporting failed RPC calls

To fail a response, you can raise a StatusRuntimeException, with code like:

F.raiseError(io.grpc.Status.INVALID_ARGUMENT
  .withDescription("Foo can not be provided with bar")
  .asRuntimeException())

This construction is somewhat awkward, we should instead have an API in this library for it.
Assuming that the approach remains to fail the response (or stream) for a non-Ok response, we could have something like Status.invalidArgument("Foo can not be provided with bar"), that returns a failed F with appropriate exception.

The full list of status codes is: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md

Use io.grpc.Grpc factory methods instead of NettyServerBuilder/NettyChannelBuilder in docs

Hi!
I want to propose changing the docs from importing io.grpc.netty.shaded.io.grpc.netty.* package and using NettyServerBuilder and NettyChannelBuilder factory classes that feel like implementation classes, to using instead Grpc.newServerBuilderForPort and Grpc.newChannelBuilderForAddress factory methods from io.grpc package in the documentation.

Will be happy to open a PR.

Performance in benchmarks

Hi

I am opening this issue to document some findings about the fs2-grpc performance in this benchmark. I started this journey investigating why the akka-grpc results were so bad (https://discuss.lightbend.com/t/akka-grpc-performance-in-benchmarks/8236/) but then got curious what would be the numbers for other implementations...

The fs2-grpc implementation of the benchmark was done in this PR and the results I got were

Benchmark info:
37a7f8b Mon, 17 May 2021 16:06:05 +0100 João Ferreira scala zio-grpc implementatio
Benchmarks run: scala_fs2_bench scala_akka_bench scala_zio_bench java_hotspot_grpc_pgc_bench
GRPC_BENCHMARK_DURATION=50s
GRPC_BENCHMARK_WARMUP=5s
GRPC_SERVER_CPUS=3
GRPC_SERVER_RAM=512m
GRPC_CLIENT_CONNECTIONS=50
GRPC_CLIENT_CONCURRENCY=1000
GRPC_CLIENT_QPS=0
GRPC_CLIENT_CPUS=9
GRPC_REQUEST_PAYLOAD=100B
-----
Benchmark finished. Detailed results are located in: results/211705T162018
--------------------------------------------------------------------------------------------------------------------------------
| name               |   req/s |   avg. latency |        90 % in |        95 % in |        99 % in | avg. cpu |   avg. memory |
--------------------------------------------------------------------------------------------------------------------------------
| java_hotspot_grpc_pgc |   59884 |       16.19 ms |       40.65 ms |       54.12 ms |       88.15 ms |  256.21% |     204.7 MiB |
| scala_akka         |    7031 |      141.70 ms |      281.35 ms |      368.74 ms |      592.53 ms |  294.91% |    175.44 MiB |
| scala_fs2          |    7005 |      142.20 ms |      231.57 ms |      266.35 ms |      357.07 ms |  274.57% |    351.34 MiB |
| scala_zio          |    6835 |      145.74 ms |      207.45 ms |      218.25 ms |      266.37 ms |  242.61% |    241.43 MiB |
--------------------------------------------------------------------------------------------------------------------------------

I did some profiling with JFR and wanted to share the results

The biggest problem is GC:

image

Threads look fine:
image

Memory:

image

And the culprits are scalapb.GeneratedMessageCompanion.parseFrom, fs2.grpc.server.Fs2ServerCall#sendMessage. There is also a lot of cats.effect.* stuff...

e2e tests

As suggested by @fiadliel we should introduce some more tests that cover:

  • client, server startup
  • e2e single-single request/response
  • e2e single-stream request/response
  • e2e stream-single request/response
  • e2e stream-stream (client closes first) request/response
  • e2e stream-stream (server closes first) request/response

There are a lot more possible scenarios (e.g. errors, test against different language implementations), but it's some of the most basic test requirements.

Server middleware support

Overview

I'd like to be able to configure a server with a middleware that would run on each request, similar to what is possible with http4s middleware.

Where this idea has come from is I've been writing servers that use otel4s to attach to existing traces (passed to the server as metadata). This works fine, but the problem is this needs to be done for every server implementation. I would instead like to be able to do this in a generic way and be able to configure the server with this behaviour.

I can't really use gRPC interceptors for this, because the state of the context in cats.mtl.Local (which otel4s and other libs use) is scoped only to the effect passed into local.

From looking into the code, we could intercept the request in the Fs2ServerCallHandler and wrap the implementation: (Request, Metadata) => F[Response] in a middleware. Then allow passing middlewares in the GeneratedCompanion.service.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.