Coder Social home page Coder Social logo

tmaxmax / go-sse Goto Github PK

View Code? Open in Web Editor NEW
315.0 5.0 17.0 711 KB

Fully featured, spec-compliant HTML5 server-sent events library

Home Page: https://pkg.go.dev/github.com/tmaxmax/go-sse

License: MIT License

Go 100.00%
sse go html5-sse server-sent-events golang golang-libary server-sent-event html5 server http

go-sse's Introduction

go-sse

Go Reference CI codecov Go Report Card

Lightweight, fully spec-compliant HTML5 server-sent events library.

Table of contents

Installation and usage

Install the package using go get:

go get -u github.com/tmaxmax/go-sse

It is strongly recommended to use tagged versions of go-sse in your projects. The master branch has tested but unreleased and maybe undocumented changes, which may break backwards compatibility - use with caution.

The library provides both server-side and client-side implementations of the protocol. The implementations are completely decoupled and unopinionated: you can connect to a server created using go-sse from the browser and you can connect to any server that emits events using the client!

If you are not familiar with the protocol or not sure how it works, read MDN's guide for using server-sent events. The spec is also useful read!

go-sse promises to support the Go versions supported by the Go team – that is, the 2 most recent major releases.

Implementing a server

Providers and why they are vital

First, a server instance has to be created:

import "github.com/tmaxmax/go-sse"

s := &sse.Server{} // zero value ready to use!

The sse.Server type also implements the http.Handler interface, but a server is framework-agnostic: See the ServeHTTP implementation to learn how to implement your own custom logic. It also has some additional configuration options:

s := &sse.Server{
    Provider: /* what goes here? find out next! */,
    OnSession: /* see Go docs for this one */,
    Logger: /* see Go docs for this one, too */,
}

What is this "provider"? A provider is an implementation of the publish-subscribe messaging system:

type Provider interface {
    // Publish a message to all subscribers of the given topics.
    Publish(msg *Message, topics []string) error
    // Add a new subscriber that is unsubscribed when the context is done.
    Subscribe(ctx context.Context, sub Subscription) error
    // Cleanup all resources and stop publishing messages or accepting subscriptions.
    Shutdown(ctx context.Context) error
}

The provider is what dispatches events to clients. When you publish a message (an event), the provider distributes it to all connections (subscribers). It is the central piece of the server: it determines the maximum number of clients your server can handle, the latency between broadcasting events and receiving them client-side and the maximum message throughput supported by your server. As different use cases have different needs, go-sse allows to plug in your own system. Some examples of such external systems are:

If an external system is required, an adapter that satisfies the Provider interface must be created so it can then be used with go-sse. To implement such an adapter, read the Provider documentation for implementation requirements! And maybe share them with others: go-sse is built with reusability in mind!

But in most cases the power and scalability that these external systems bring is not necessary, so go-sse comes with a default provider builtin. Read further!

Meet Joe, the default provider

The server still works by default, without a provider. go-sse brings you Joe: the trusty, pure Go pub-sub implementation, who handles all your events by default! Befriend Joe as following:

import "github.com/tmaxmax/go-sse"

joe := &sse.Joe{} // the zero value is ready to use!

and he'll dispatch events all day! By default, he has no memory of what events he has received, but you can help him remember and replay older messages to new clients using a ReplayProvider:

type ReplayProvider interface {
    // Put a new event in the provider's buffer.
    // If the provider automatically adds IDs aswell,
    // the returned message will also have the ID set,
    // otherwise the input value is returned.
    Put(msg *Message, topics []string) *Message
    // Replay valid events to a subscriber.
    Replay(sub Subscription)
}

go-sse provides two replay providers by default, which both hold the events in-memory: the ValidReplayProvider and FiniteReplayProvider. The first replays events that are valid, not expired, the second replays a finite number of the most recent events. For example:

joe = &sse.Joe{
    ReplayProvider: &sse.ValidReplayProvider{TTL: time.Minute * 5}, // let's have events expire after 5 minutes 
}

will tell Joe to replay all valid events! Replay providers can do so much more (for example, add IDs to events automatically): read the docs on how to use the existing ones and how to implement yours.

You can also implement your own replay providers: maybe you need persistent storage for your events? Or event validity is determined based on other criterias than expiry time? And if you think your replay provider may be useful to others, you are encouraged to share it!

go-sse created the ReplayProvider interface mainly for Joe, but it encourages you to integrate it with your own Provider implementations, where suitable.

Publish your first event

To publish events from the server, we use the sse.Message struct:

import "github.com/tmaxmax/go-sse"

m := &sse.Message{}
m.AppendData("Hello world!", "Nice\nto see you.")

Now let's send it to our clients:

var s *sse.Server

s.Publish(m)

This is how clients will receive our event:

data: Hello world!
data: Nice
data: to see you.

You can also see that go-sse takes care of splitting input by lines into new fields, as required by the specification.

Keep in mind that providers, such as the ValidReplayProvider used above, will panic if they receive events without IDs. To have our event expire, as configured, we must set an ID for the event:

m.ID = sse.ID("unique")

This is how the event will look:

id: unique
data: Hello world!
data: Nice
data: to see you.

Now that it has an ID, the event will be considered expired 5 minutes after it's been published – it won't be replayed to clients after the expiry!

sse.ID is a function that returns an EventID – a special type that denotes an event's ID. An ID must not have newlines, so we must use special functions which validate the value beforehand. The ID constructor function we've used above panics (it is useful when creating IDs from static strings), but there's also NewID, which returns an error indicating whether the value was successfully converted to an ID or not:

id, err := sse.NewID("invalid\nID")

Here, err will be non-nil and id will be an unset value: no id field will be sent to clients if you set an event's ID using that value!

Setting the event's type (the event field) is equally easy:

m.Type = sse.Type("The event's name")

Like IDs, types cannot have newlines. You are provided with constructors that follow the same convention: Type panics, NewType returns an error. Read the docs to find out more about messages and how to use them!

The server-side "Hello world"

Now, let's put everything that we've learned together! We'll create a server that sends a "Hello world!" message every second to all its clients, with Joe's help:

package main

import (
    "log"
    "net/http"
    "time"

    "github.com/tmaxmax/go-sse"
)

func main() {
    s := &sse.Server{}

    go func() {
        m := &sse.Message{}
        m.AppendData("Hello world")

        for range time.Tick(time.Second) {
            _ = s.Publish(m)
        }
    }()

    if err := http.ListenAndServe(":8000", s); err != nil {
        log.Fatalln(err)
    }
}

Joe is our default provider here, as no provider is given to the server constructor. The server is already an http.Handler so we can use it directly with http.ListenAndServe.

Also see a more complex example!

This is by far a complete presentation, make sure to read the docs in order to use go-sse to its full potential!

Using the client

Creating a client

We will use the sse.Client type for connecting to event streams:

type Client struct {
    HTTPClient              *http.Client
    OnRetry                 backoff.Notify
    ResponseValidator       ResponseValidator
    MaxRetries              int
    DefaultReconnectionTime time.Duration
}

As you can see, it uses a net/http client. It also uses the cenkalti/backoff library for implementing auto-reconnect when a connection to a server is lost. Read the client docs and the Backoff library's docs to find out how to configure the client. We'll use the default client the package provides for further examples.

Initiating a connection

We must first create an http.Request - yup, a fully customizable request:

req, err := http.NewRequestWithContext(ctx, http.MethodGet, "host", http.NoBody)

Any kind of request is valid as long as your server handler supports it: you can do a GET, a POST, send a body; do whatever! The context is used as always for cancellation - to stop receiving events you will have to cancel the context. Let's initiate a connection with this request:

import "github.com/tmaxmax/go-sse"

conn := sse.DefaultClient.NewConnection(req)
// you can also do sse.NewConnection(req)
// it is an utility function that calls the
// NewConnection method on the default client

Subscribing to events

Great! Let's imagine the event stream looks as following:

data: some unnamed event

event: I have a name
data: some data

event: Another name
data: some data

To receive the unnamed events, we subscribe to them as following:

unsubscribe := conn.SubscribeMessages(func (event sse.Event) {
    // do something with the event
})

To receive the events named "I have a name":

unsubscribe := conn.SubscribeEvent("I have a name", func (event sse.Event) {
    // do something with the event
})

If you want to subscribe to all events, regardless of their name:

unsubscribe := conn.SubscribeToAll(func (event sse.Event) {
    // do something with the event
})

All Susbcribe methods return a function that when called tells the connection to stop calling the corresponding callback.

In order to work with events, the sse.Event type has some fields and methods exposed:

type Event struct {
    LastEventID string
    Name        string
    Data        string
}

Pretty self-explanatory, but make sure to read the docs!

Now, with this knowledge, let's subscribe to all unnamed events and, when the connection is established, print their data:

unsubscribe := conn.SubscribeMessages(func(event sse.Event) {
    fmt.Printf("Received an unnamed event: %s\n", event.Data)
})

Establishing the connection

Great, we are subscribed now! Let's start receiving events:

err := conn.Connect()

By calling Connect, the request created above will be sent to the server, and if successful, the subscribed callbacks will be called when new events are received. Connect returns only after all callbacks have finished executing. To stop calling a certain callback, call the unsubscribe function returned when subscribing. You can also subscribe new callbacks after calling Connect from a different goroutine. When using a context.Context to stop the connection, the error returned will be the context error – be it context.Canceled, context.DeadlineExceeded or a custom cause (when using context.WithCancelCause). In other words, a successfully closed Connection will always return an error – if the context error is not relevant, you can ignore it. For example:

if err := conn.Connect(); !errors.Is(err, context.Canceled) {
    // handle error
}

A context created with context.WithCancel, or one with context.WithCancelCause and cancelled with the error context.Canceled is assumed above.

There may be situations where the connection does not have to live for indeterminately long – for example when using the OpenAI API. In those situations, configure the client to not retry the connection and ignore io.EOF on return:

client := sse.Client{
    Backoff: sse.Backoff{
        MaxRetries: -1,
    },
    // other settings...
}

req, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/...", body)
conn := client.NewConnection(req)

conn.SubscribeMessages(/* callback */)

if err := conn.Connect(); !errors.Is(err, io.EOF) {
    // handle error
}

Connection lost?

Either way, after receiving so many events, something went wrong and the server is temporarily down. Oh no! As a last hope, it has sent us the following event:

retry: 60000
: that's a minute in milliseconds and this
: is a comment which is ignored by the client

Not a sweat, though! The connection will automatically be reattempted after a minute, when we'll hope the server's back up again. Canceling the request's context will cancel any reconnection attempt, too.

If the server doesn't set a retry time, the client's DefaultReconnectionTime is used.

The "Hello world" server's client

Let's use what we know to create a client for the previous server example:

package main

import (
    "fmt"
    "net/http"
    "os"

    "github.com/tmaxmax/go-sse"
)

func main() {
    r, _ := http.NewRequest(http.MethodGet, "http://localhost:8000", nil)
    conn := sse.NewConnection(r)

    conn.SubscribeMessages(func(ev sse.Event) {
        fmt.Printf("%s\n\n", ev.Data)
    })

    if err := conn.Connect(); err != nil {
        fmt.Fprintln(os.Stderr, err)
    }
}

Yup, this is it! We are using the default client to receive all the unnamed events from the server. The output will look like this, when both programs are run in parallel:

Hello world!

Hello world!

Hello world!

Hello world!

...

See the complex example's client too!

License

This project is licensed under the MIT license.

Contributing

The library's in its early stages, so contributions are vital - I'm so glad you wish to improve go-sse! Maybe start by opening an issue first, to describe the intended modifications and further discuss how to integrate them. Open PRs to the master branch and wait for CI to complete. If all is clear, your changes will soon be merged! Also, make sure your changes come with an extensive set of tests and the code is formatted.

Thank you for contributing!

go-sse's People

Contributors

aldld avatar hugowetterberg avatar nkcr avatar tmaxmax avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

go-sse's Issues

Expose retry timeout configuration

Hi here. Good library.

I would like to keep my client connection forever as long as I want but with the ability to cancel.

Cancelling is not a problem, but it seems configuring the retry option is not baked into the code. Can this be done?

Currently I do not even know what the max retry backoff is so I'm in the dark on that one. It seems like the only way is to enclose the conn.Connect() inside a loop, but that doesn't feel right because it's already handling the timing inside.

👋 Are you using go-sse?

Hi there,

If you're using go-sse for any kind of project or trying to evaluate whether go-sse is the right tool for your project, please let me know here what you're using go-sse or planning to use for!

This will help me tremendously in planning the development of the library, designing its API and testing it properly. And if you're not sure whether you should use it or feel like it's missing something, by chiming in here we can find a path to make go-sse the right choice for you!

Thank you for the support!

P.S. Make sure to see the roadmap to v1 and the proposals to keep up to date with go-sse's development!

Help getting started

Hi, thanks so much for doing this package. It's just what I need at the point I'm at in my project.

I copied from the complex and simple examples.

I have this much working

UPDATE: made sure it works, here's a proof of concept https://github.com/gostega/go-sse-poc

main.go

package main

import (
	"fmt"
	"io"
	"log"
	"net/http"
	"surfactor/pkg/controllers"
	"surfactor/pkg/models"
	"text/template"
	"time"

	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	"github.com/tmaxmax/go-sse"
)

const (
	topicRandomNumbers = "numbers"
	topicStateChanges  = "evt_statechange"
)

var sseHandler = &sse.Server{
	Provider: &sse.Joe{
		ReplayProvider: &sse.ValidReplayProvider{
			TTL:        time.Minute * 5,
			GCInterval: time.Minute,
			AutoIDs:    false,
		},
	},
	Logger: nil,
	OnSession: func(s *sse.Session) (sse.Subscription, bool) {
		topics := s.Req.URL.Query()["topic"]
		for _, topic := range topics {
			if topic != topicRandomNumbers && topic != topicStateChanges {
				fmt.Fprintf(s.Res, "invalid topic %q; supported are %q, %q", topic, topicRandomNumbers, topicStateChanges)
				s.Res.WriteHeader(http.StatusBadRequest)
				return sse.Subscription{}, false
			}
		}
		if len(topics) == 0 {
			// Provide default topics, if none are given.
			topics = []string{topicRandomNumbers, topicStateChanges}
		}

		return sse.Subscription{
			Client:      s,
			LastEventID: s.LastEventID,
			Topics:      append(topics, sse.DefaultTopic), // the shutdown message is sent on the default topic
		}, true
	},
}

func sseTest(c echo.Context) error {
	// s := &sse.Server{}

	// seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
	// const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
	go func() {

		// randID := seededRand.Intn(10)
		// b := make([]byte, 10)

		// for range time.Tick(time.Second) {
		// 	m := &sse.Message{}
		// 	m.ID = sse.ID(fmt.Sprintf("thread:%d", randID))
		// 	for i := range b {
		// 		b[i] = charset[seededRand.Intn(len(charset))]
		// 	}

		// 	m.AppendData(string(b))
		// 	_ = sseHandler.Publish(m)
		// }
		m := &sse.Message{}
		m.ID = sse.ID(topicStateChanges)
		m.AppendData("Connected")
		_ = sseHandler.Publish(m)

	}()

	sseHandler.ServeHTTP(c.Response(), c.Request())

	return nil
}

func main() {

        // -------------------------------------
	// ECHO ROUTER INITIALISATION
	// -------------------------------------
	e := echo.New()
	e.Use(middleware.Logger())
	// e.Use(middleware.Recover()) // only works with panics, not 'fatal'
	e.Renderer = NewTemplates()

       e.GET("/events", sseTest)

	/* Start the http server */
	go e.Logger.Fatal(e.Start(port))

        // would also be nice to send a closing message here. "bye" or something
	// I tried this but nothing happens. Perhaps it only works if my app gets terminated differently? I'm just using CTRL+C
	mBye := &sse.Message{Type: sse.Type("close")}
	mBye.ID = sse.ID(topicStateChanges)
	mBye.AppendData("Bye")
	defer sseHandler.Publish(mBye)
}

It works, if I have the random string code uncommented, it will send me random strings every second.
If I have it commented, like above, then I have only the 'Connected' message which I did just to show that the connection was successful.

$ curl localhost:9090/events?topic=evt_statechange
id: evt_statechange
data: Connected

The part that I'm stuck on now, is publishing messages from elsewhere in my code. I have other modules that do things like update the state on a node. And interact with a database. How do I call `_ = sseHandler.Publish("mymessage") from elsewhere?

For example I have another file models/node.go

package models

type Node struct {
	gorm.Model
	Name        string `json:"name"`
	State       State  `json:"state"`
	Target      string `json:"target"`
	Description string `json:"description"`
	Parents     []Path `gorm:"many2many:nodes_paths;"`
	fromState   State  //making the variable start with lowercase makes it invisible to json
}

func (n *Node) AfterUpdate(tx *gorm.DB) (err error) {
	if n.State != n.fromState {
		// how do I from here publish a message to the client?
	}
}

Server Setup

Hi,

I have a few questions. I am working on a side project that just streams stock api data to a React app. I have another microservice that ingests the stock data and writes it to a Postgres DB but I'm also planning to add Redis (or some Pub/Sub system) to emit it to my api microservice.

I'm exploring the use case for SSE vs websockets and I think SSE seems to fit my use case more but am wondering if this is scalable across many topics (i.e user from React app picks some stocks to listen to price updates from).

It just seems like there's a central server for handling all topics but are there performance benchmarks / some suggestions on system design for my use case?

Thanks

How to increase the 65K message limit on the receiving side?

I'm attempting to use go-sse in the hiveot project (iot digital twin hub) and ran into a limitation when pushing messages from the hub to a client via SSE. Everything works well when the messages are smaller than 65K. When they exceed 65K they never arrive. The sending side however does not report any error.

Some digging found that Connection.read() creates parser.New(), which calls bufio.NewScanner() which sets a default buffer of MaxScanTokenSize, which is indeed 65K.

Scanner.Buffer seems to be the way to change this buffer. However I don't see a way to do this in between Connection.read() and the start of scanning.

What is the intended way to change the buffer size after creating a connection?

Alternative SSE client

The current client API offers a great deal of flexibility:

  • with an sse.Client multiple Connections with the same configuration can be made
  • there can be multiple event listeners (for distinct or the same event)
  • event listeners can be added or removed after the connection is established
  • event listeners can listen to a single event type or all of them.

This doesn't come for free, though: both the user-facing API and the implementation code are complex, and the client uses a bit more resources, generates more garbage and must ensure serialized concurrent access to internal state.

As of now, the instantiation of a connection with cancellation, some custom configuration and sending data on a channel looks as follows:

ctx, cancel := context.WithCancel(context.Background()) // or other means of creating a context, might come from somewhere
defer cancel()

client := sse.Client{
	/* your options */
}

r, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:8000", http.NoBody)
conn := client.NewConnection(r)

ch := make(chan sse.Event)
go func() {
	for ev := range ch {
		fmt.Println("%s\n\n", event.Data)
	}
}()

conn.SubscribeMessages(func(event sse.Event) {
	ch <- event
})

if err := conn.Connect(); err != nil {
	fmt.Fprintln(os.Stderr, err)
}

I've added the channel because from what I have observed, most users of the library create callbacks which mainly send the events on a channel to be consumed elsewhere.

I think this is quite a mouthful and I wonder whether enough use of the aforementioned flexibilities is made for them to justify the current API.

Here's another way in which I think the client could be designed. Instead of having a Client type and a Connection type with many methods, we could instead have the following:

package sse

// Connect does the HTTP request, receives the events from the server and sends them
// on the given channel.
//
// Returns errors if any of the parameters are invalid. Besides that it has the exact same
// behavior as `sse.Connection.Connect` has.
func Connect(req *http.Request, msgs chan<- Event, config *ConnectConfig) error

type ConnectConfig struct {
	/* the same stuff that is on sse.Client currently */
}

Usage would look as follows:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan sse.Event)
go func() {
	for ev := range ch {
		if ev.Type == "" {
			fmt.Printf("%s\n\n", event.Data)
		}
	}
}()

config := &see.ConnectConfig{
	/* your options */
}

r, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:8000", http.NoBody)
if err := sse.Connect(r, ch, config); err != nil {
	fmt.Fprintln(os.Stderr, err)
}

It is not that much shorter, but assuming that the context comes from elsewhere and that the configuration is already defined, the code necessary for establishing a connection is significantly shorter – creating an http.Request and calling Connect. Connection with the default configuration would also not need a separate top-level function – just pass nil instead of a ConnectionConfig!

There are two important changes here, though:

  • checking that the received events are of the desired type is now the user's responsibility
  • new event listeners cannot be added (so easily) on the fly – the user would have to implement this themselves

For example, if we receive three diferent event types and we handle them differently, previously one could do:

conn.SubscribeEvent("a", func(e sse.Event) {
	aCh <- e
})
conn.SubscribeEvent("b", func(e sse.Event) {
	bCh <- e
})
conn.SubscribeEvent("c", func(e sse.Event) {
	cCh <- e
})
if err := conn.Connect(); err != nil {
	// handle error
}

With this change, it would look like this:

evs := make(chan sse.Event)
go func() {
	for e := range evs {
		switch e.Type {
		case "a":
			aCh <- e // or just handle them here instead of sending them on another channel
		case "b":
			bCh <- e
		case "c":
			cCh <- e
		}
	}
}()
if err := sse.Connect(req, evs, nil); err != nil {
	// handle error
}

On the flipside, simple requests would be easier to make. Consider a request to ChatGPT:

prompt, _ := json.Marshal(OpenAIRequest{
	Model:    "gpt-4-1106-preview",
	Messages: msgs,
	Stream:   true,
})
r, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/v1/chat/completions", strings.NewReader(string(prompt)))
r.Header.Add("Authorization", fmt.Sprintf("Bearer %s", OpenAIKey))
r.Header.Add("Content-Type", "application/json")
conn := sse.NewConnection(r)

conn.SubscribeMessages(func(ev sse.Event) {
	events <- ev // it is processed elsewhere
})

if err := conn.Connect(); err != nil {
	/* handle error */
}

This would be the new version:

prompt, _ := json.Marshal(OpenAIRequest{
	Model:    "gpt-4-1106-preview",
	Messages: msgs,
	Stream:   true,
})
r, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/v1/chat/completions", strings.NewReader(string(prompt)))
r.Header.Add("Authorization", fmt.Sprintf("Bearer %s", OpenAIKey))
r.Header.Add("Content-Type", "application/json")

if err := sse.Connect(r, events, nil); err != nil {
	/* handle error */
}

There are obvious benefits:

  • much less boilerplate – no more NewConnection -> SubscribeMessages -> Connect
  • it is not possible to connect without receiving the messages
  • given that the connection code is shorter, focus is moved on the creation of the request
  • handling the response data happens directly in user's code – there's no function boundary to separate business logic, no inversion of control

As an analogy, imagine if the net/http.Client would be used something like this:

conn := http.Client.NewConnection(req)
conn.HandleResponse(func(res *http.Response) {
	// do something with the response
})
if err := conn.Connect(); err != nil {
	// handle error
}

It would be painful to use.

The main advantage of the new API would be, I believe, that the control of the response is fully in the library user's hands. There are no callbacks one needs to reason about; there is no need for the user to look up the source code to find out how the Connection behaves in various respects – for example, in what order the event listeners are called; finally, in a paradoxical manner there would be one single way to do things: for example, if one wants to handle multiple event types, currently they can register multiple callbacks for each event, or write the same switch code as above inside a callback passed to SubscribeAll. Also, it would be much easier to maintain – this change would result in ~200LOC and 6 public API entities being removed. This reduction in API surface reduces documentation and in the end how much the user must learn about the library in order to use it effectively.

Looking forward on your input regarding this API change!

what happened if the server has internal error when using go-sse as client

here is my test code
` r, _ := http.NewRequestWithContext(c, http.MethodPost, "http://xxxx/v1/generate_stream", strings.NewReader(string(bodyS)))
conn := sse.NewConnection(r)

ubsubscribe := conn.SubscribeToAll(func(event sse.Event) {
	eventS, _ := json.Marshal(event)
	switch event.Type {
	case "cycles", "ops":
		fmt.Printf("Metric %s: %s\n", event.Type, event.Data)
	case "close":
		fmt.Println("Server closed!")
	default: // no event name
		util.Log.Info(event.Data)
		messages <- string(eventS)
	}
	if event.Data == "[DONE]" || strings.Contains(event.Data, `finish_reason\":"`) {
		close(finish)
	}
})

go func() {
	<-finish
	util.Log.Debug("!!!!!!!finish")
	ubsubscribe()
}()

if err := conn.Connect(); err != nil {
	util.Log.Error(err.Error())
}`

I used wrong request body and found that it seems the subscriber is stucked and cannot quit.

Expose SSE events parser (`internal/parser`)

Hey, I was searching around for only an SSE events parser - encoder/decoder. This was the only library I could find that isn't ancient - thanks for your work!

I'm using an openapi spec which generates ready to use API calls from client side - which is why I am not using the client in go-sse (it would take time to make it play well with openAPI generated code). However, I'm interested in using the code inside internal/parser. I could also help you with exposing it if you had a few ideas in mind.

I saw that the code in https://github.com/donovanhide/eventsource/blob/master/decoder.go, was much simpler and shorter than what you have written up - is there any advantage that I'm missing to your implementation? Please do let me know!

Thanks!

Originally posted by @Pramodh-G in #7 (comment)

Client connects with HTTP/2 but reconnects using HTTP/1.1

Hi,
I'm using the go-sse client to connect to a SSE server. This works fine.
The initial connection is established with HTTP/2.
When stopping and starting the server, the client automatically reconnects as expected.
The interesting bit however is that the reconnect uses HTTP/1.1.

Is there a way to force a reconnect using HTTP/2?

A stripped down version of the code to establish the connection:

req, err := http.NewRequest("GET", serverURL, bytes.NewReader([]byte{}))
req.Header.Add("Authorization", "bearer "+bearerToken)
sseCtx, sseCancelFn := context.WithCancel(context.Background())
req = req.WithContext(sseCtx)
sseClient := sse.Client{}
conn := sseClient.NewConnection(req)
newBuf := make([]byte, 0, 1024*65)
conn.Buffer(newBuf, myMaxSSEMessageSize)   // thank you for this feature
remover := conn.SubscribeToAll(mySSEHandler)
go func() {
    err := conn.Connect()
    remover()
}()

PS: This is with golang 1.21

Split code into multiple modules

I've been contemplating for a while now this change.

At the point of writing this, all code in go-sse is lumped in a single package: sse. Inside this package there are multiple logical units of code:

  • an SSE client
  • an SSE server
  • an implementation of a Provider – that is, a message broker – for servers
  • an event serializer & deserializer

There are some pain points which come with this configuration:

  • polluted public API
    How can FiniteReplayProvider and Backoff reside in the same package? Which type is for what exactly? Is the Logger for the client or for the server? API discoverability also suffers because of this – when typing sse. code completion goes crazy with all the names in the package.
  • names of all entities in the package must be unique
    sse.Message is a direct victim – it is actually an event, which is sent from the server. The name Message is non-sense here, but a different name than Event is required because Event is an event parsed by the client. They could be named ServerEvent and ClientEvent respectively but that doesn't sit right with me – if I have multiple types like this with Client- or Server- prefix it'd be as if there'd be multiple packages
  • updates to a certain part of the package must bump the version of the entire package
    Should someone that uses only the client have to update the library because something server related was updated?
  • dependent code must import unused code
    Why should a new provider implementation import the client and the default provider when it really only needs the event serializer and maybe the server?

Furthermore, if for example other providers or logger adapters are implemented, where would they reside? They could live in another repository – but that would mean to either pollute my account with multiple repositories related to go-sse or create an organization and to spread issues, PRs and discussions across multiple places, which would make maintenance more difficult.

Here's the layout I propose (open to suggestions for new names):

  • sseclient – the SSE client
  • sseserver – the SSE server
  • providers/ssejoe – the Joe provider
  • ssewire – event serialization & deserialization ("wire" from "wire format")

Other modules that could exist:

  • ssedge – experimental APIs
  • providers/sse* – other providers
  • loggers/sse* – logger adapters for the server
  • examples – for various example usages of go-sse

This split into multiple modules is also important for the implementation of other providers and logger adapters. Without it, library users would pull in their projects transitive dependencies (hefty message broker clients, loggers) which they don't use.

Better modularity would also be encouraged, as API boundaries are clearly defined.

Disadvantages that I can see would be:

  • the short and convenient sse package name would be lost
    If the API surface of sseclient and sseserver is small enough then maybe they could be merged into sse?
  • there will be some technical hurdles in implementing this (from the separations into modules itself, to CI and versioning)
    The good part is that they must be get right only once. To do the splitting go.work is probably the tool – it must be investigated what can be done and what cannot.

Looking forward on feedback on this possibly new layout! Do you prefer the current one instead? What's your take?

Retry doesn't seem to work

Created the client as:

	client := &sse.Client{
		HTTPClient:              sseClient,
		ResponseValidator:       sse.DefaultValidator,
		MaxRetries:              -1,
		DefaultReconnectionTime: 5 * time.Second,
		OnRetry: func(err error, wait time.Duration) {
			log.Warn().Msgf("Retry %v %v", err, wait)
		},
	}

       // code not shown for brevity
	req, _ := http.NewRequestWithContext(sseContext, http.MethodGet, cloud.JoinPath(sseEndpoint).String(), nil)

	conn := client.NewConnection(req)

	go func() {
		log.Info().Msg("Connecting to SSE...")
		if err := conn.Connect(); err != nil {
			log.Err(err).Msg("Failed to connect to SSE")
		}
		log.Info().Msg("Disconnected from SSE")
	}()

I found two problems:

  1. There are no retry if the server was not up when the client start
  2. There are no retry if the server is shutdown after the connection is made

Memory leak in FiniteReplayProvider

The slice operations in the FiniteReplayProvider leads to an infinitely growing buffer of messages as the capacity of the buffer will increase on the first append after each dequeue. Reproduced the behaviour as a small code sample:

package main

import "fmt"

func main() {
	buf := make([]int, 0, 10)

	var (
		lastDequeue  int
		dequeueCount int
	)

	for i := 0; i < 200; i++ {
		buf = append(buf, i)

		if dequeueCount > 0 && lastDequeue == i-1 {
			println("\t- on append after dequeue:",
				"length", len(buf),
				"cap", cap(buf))
		}

		// Comparing length to cap with the assumption that it will
		// remain unchanged.
		if len(buf) == cap(buf) {
			fmt.Printf(
				"dequeue after %d iterations, length %d, cap %d\n",
				len(buf), cap(buf), 1+i-lastDequeue)

			lastDequeue = i
			dequeueCount++

			// Moving the slice ahead in backing array to dequeue,
			// length will now be one less, but next time we append
			// the capacity will increase to accommodate the new
			// item.
			buf = buf[1:]

			println("\t- after dequeue:",
				"length", len(buf),
				"cap", cap(buf))

		}
	}

	println()
	println("total dequeues:", dequeueCount)

	println("final length:", len(buf))
	println("final cap:", cap(buf))

	println()
	println("items:")

	// We will still retain all items minus the number of dequeues that have
	// been performed.
	for _, i := range buf {
		if i%10 == 0 {
			println()
		}

		fmt.Printf("%3d ", i)
	}
}

Output:

dequeue after 10 iterations, length 10, cap 10
	- after dequeue: length 9 cap 9
	- on append after dequeue: length 10 cap 18
dequeue after 18 iterations, length 18, cap 10
	- after dequeue: length 17 cap 17
	- on append after dequeue: length 18 cap 36
dequeue after 36 iterations, length 36, cap 20
	- after dequeue: length 35 cap 35
	- on append after dequeue: length 36 cap 72
dequeue after 72 iterations, length 72, cap 38
	- after dequeue: length 71 cap 71
	- on append after dequeue: length 72 cap 144
dequeue after 144 iterations, length 144, cap 74
	- after dequeue: length 143 cap 143
	- on append after dequeue: length 144 cap 288

total dequeues: 5
final length: 195
final cap: 288

items:
  5   6   7   8   9 
 10  11  12  13  14  15  16  17  18  19 
 20  21  22  23  24  25  26  27  28  29 
 30  31  32  33  34  35  36  37  38  39 
 40  41  42  43  44  45  46  47  48  49 
 50  51  52  53  54  55  56  57  58  59 
 60  61  62  63  64  65  66  67  68  69 
 70  71  72  73  74  75  76  77  78  79 
 80  81  82  83  84  85  86  87  88  89 
 90  91  92  93  94  95  96  97  98  99 
100 101 102 103 104 105 106 107 108 109 
110 111 112 113 114 115 116 117 118 119 
120 121 122 123 124 125 126 127 128 129 
130 131 132 133 134 135 136 137 138 139 
140 141 142 143 144 145 146 147 148 149 
150 151 152 153 154 155 156 157 158 159 
160 161 162 163 164 165 166 167 168 169 
170 171 172 173 174 175 176 177 178 179 
180 181 182 183 184 185 186 187 188 189 
190 191 192 193 194 195 196 197 198 199 

https://go.dev/play/p/Ut8T-tyB2-5

🚧 Roadmap to v1

Here's what's planned for v1, listed by priority:

  • Improve internal parsing API (#6, 1352b29)
  • Improve server-side message creation API
    • A refactor and simplification (#6)
    • A Message.Writer, which returns an io.Writer that can be used to write data fields to the event. (#9)
      subsequently removed (#11), it is superfluous
    • Replace AppendData with a simple Data string field
  • Improve server implementation
    • Fix memory leak in FiniteReplayProvider (#23)
      • Refactor ValidReplayProvider as a consequence of the changes made by the other PR
    • Add benchmarks for all server components and create some end-to-end performance testing of the server for common use-cases (input from the community would help, #24)
    • Re-evaluate the Session API, OnSession and MessageWriter – could we do without them and use standard net/http types instead?
  • Improve client
    • Modify response validators so they're able to signal if an error is temporary or not
    • Experiment and try to receive feedback on a new, potentially simpler API (#25)
  • General codebase improvements:
    • Remove all panics
      • Use constructors which return errors instead of lazily initialized objects which panic later at runtime if configuration parameters are invalid
    • Improve testability of all APIs
    • Refactor tests to ensure full coverage of all use cases and edge cases and remove flakiness
    • Fix or improve documentation, add more examples where necessary
  • Experiment with splitting the codebase into multiple modules and making it a monorepo using go.work (#26)
  • At least an additional provider implementation (e.g. for Redis, nats-io or other) (see also #13, #19)
  • Add event parser to the sse module? (the code in internal/parser – is there demand for this?)
  • Project process and discoverability improvements (as activity surrounding it grows)
    • Create a vulnerabilities reporting policy
    • Add known use-cases, projects which use go-sse, external provider implementations if any to README
    • Create issue templates, relevant labels for sorting etc.
  • The test of time – try to grow usage of library, solve issues or requests that may pop up, and wait for things to be stable

disorder `SubscribeMessages` event

Here is my testcase

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"strings"

	"github.com/tmaxmax/go-sse"
)

type OpenAIRequest struct {
	Model    string    `json:"model"`
	Messages []Message `json:"messages"`
	Stream   bool      `json:"stream"`
}
type Message struct {
	Role    string `json:"role"`
	Content string `json:"content"`
}
type OpenAIResponse struct {
	OrderId           int
	ID                string `json:"id"`
	Object            string `json:"object"`
	Created           int    `json:"created"`
	Model             string `json:"model"`
	SystemFingerprint string `json:"system_fingerprint"`
	Choices           []struct {
		Index int `json:"index"`
		Delta struct {
			Content string `json:"content"`
		} `json:"delta"`
		FinishReason interface{} `json:"finish_reason"`
	} `json:"choices"`
}

func main() {
	msg := make(chan string)
	go requestOpenAI([]Message{
		{
			Role:    "user",
			Content: "Hello",
		},
	}, msg)
	finalMsg := ""
	for m := range msg {
		finalMsg += m
	}
	fmt.Println(finalMsg)
}

func requestOpenAI(msgs []Message, events chan string) {
	OpenAIKey := "sk-xxx"
	defer close(events)
	prompt, _ := json.Marshal(OpenAIRequest{
		Model:    "gpt-4-1106-preview",
		Messages: msgs,
		Stream:   true,
	})
	r, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/v1/chat/completions", strings.NewReader(string(prompt)))
	r.Header.Add("Authorization", fmt.Sprintf("Bearer %s", OpenAIKey))
	r.Header.Add("Content-Type", "application/json")
	conn := sse.NewConnection(r)

	orderId := 0
	conn.SubscribeMessages(func(ev sse.Event) {
		orderId++
		var resp OpenAIResponse
		json.Unmarshal([]byte(ev.Data), &resp)
		resp.OrderId = orderId
		if len(resp.Choices) == 0 || resp.Choices[0].FinishReason == "stop" {
			return
		}
		fmt.Printf("Got event, evtId: %d, msg: %s\n", orderId, resp.Choices[0].Delta.Content)
		events <- resp.Choices[0].Delta.Content
	})

	if err := conn.Connect(); err != nil {
		log.Println(err)
	}
}

Run output:

go run test.go
Got event, evtId: 1, msg: 
Got event, evtId: 3, msg: Hello
Got event, evtId: 3, msg:  How
Got event, evtId: 4, msg: !
Got event, evtId: 5, msg:  assist
Got event, evtId: 6, msg:  can
Got event, evtId: 7, msg:  I
Got event, evtId: 8, msg: ?
Got event, evtId: 9, msg:  you
Got event, evtId: 10, msg:  today
Hello How! assist can I? you today

Race condition in server using sub.Client.Send()

There seems to be a race condition sending messages from the server to the client.

In sse.Server OnSession a subscription is created as follow:

gosse := &sse.Server{}
gosse.OnSession = func(sseSession *sse.Session) (sse.Subscription, bool) {
sub := sse.Subscription{
	Client:      sseSession,
	LastEventID: sseSession.LastEventID,
	Topics:      []string{sse.DefaultTopic},
   }
}

In a separate gorouting, messages are send from the server to this client:

sseMsg := sse.Message{}
sseMsg.AppendData(myPayload)
sseMsg.ID, _ = sse.NewID(eventID)
sseMsg.Type, err = sse.NewType(eventType)
err = sub.Client.Send(&sseMsg)
_ = sub.Client.Flush()

This works most of the time, but running it with go test -race exposes a race condition:

WARNING: DATA RACE
Write at 0x00c0001764d2 by goroutine 380:
  net/http.(*http2responseWriter).handlerDone()
      /snap/go/10627/src/net/http/h2_bundle.go:6860 +0x6d
  net/http.(*http2serverConn).runHandler.func1()
      /snap/go/10627/src/net/http/h2_bundle.go:6230 +0x404
  runtime.deferreturn()
      /snap/go/10627/src/runtime/panic.go:477 +0x30
  net/http.(*http2serverConn).scheduleHandler.func1()
      /snap/go/10627/src/net/http/h2_bundle.go:6167 +0x5d

Previous read at 0x00c0001764d2 by goroutine 381:
  net/http.(*http2responseWriterState).writeChunk()
      /snap/go/10627/src/net/http/h2_bundle.go:6562 +0xa84
  net/http.http2chunkWriter.Write()
      /snap/go/10627/src/net/http/h2_bundle.go:6428 +0x48
  bufio.(*Writer).Flush()
      /snap/go/10627/src/bufio/bufio.go:642 +0xf0
  net/http.(*http2responseWriter).FlushError()
      /snap/go/10627/src/net/http/h2_bundle.go:6693 +0x9e
  github.com/tmaxmax/go-sse.flusherErrorWrapper.Flush()
      /home/henk/go/pkg/mod/github.com/tmaxmax/[email protected]/session.go:160 +0x46
  github.com/tmaxmax/go-sse.(*flusherErrorWrapper).Flush()
      <autogenerated>:1 +0x17
  github.com/tmaxmax/go-sse.(*Session).Flush()
      /home/henk/go/pkg/mod/github.com/tmaxmax/[email protected]/session.go:61 +0x83
  github.com/hiveot/hub/runtime/transports/httpstransport.(*HttpsTransport).Start.NewSSEServer.NewGoSSEServer.func1.1()
      /home/henk/dev/hiveot/hub/runtime/transports/httpstransport/sseserver/GoSseServer.go:58 +0x57a

The documentation of the Send function states that the interface does not have to be thread-safe:
(server.go:26)

// The Subscription struct is used to subscribe to a given provider.
type Subscription struct {
	// The client to which messages are sent. The implementation of the interface does not have to be
	// thread-safe – providers will not call methods on it concurrently.
	Client MessageWriter

Client drops messages longer than 4096 bytes

Hello! I've been using go-sse lately and have been enjoying it a lot. One issue that I came across recently is that messages that are longer than 4096 bytes get dropped by the client. I narrowed this down to an issue with how we parse messages from the event stream.

PR #2 contains a test case that reproduces this bug, along with a fix. Let me know if you have any questions, or need more information!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.