Coder Social home page Coder Social logo

elchinmemphis / memphis.go Goto Github PK

View Code? Open in Web Editor NEW

This project forked from superstreamlabs/memphis.go

0.0 0.0 0.0 206 KB

Go client for Memphis. Memphis is a Real-Time Data Processing Platform

Home Page: https://pkg.go.dev/github.com/memphisdev/memphis.go

License: Apache License 2.0

Go 100.00%

memphis.go's Introduction

Memphis light logo

Memphis light logo

Simple as RabbitMQ, Robust as Apache Kafka, and Perfect for microservices.

Memphis UI

CNCF Silver Member

CNCF Silver Member

Sandbox - Docs - Twitter - YouTube

Discord Code Of Conduct GitHub release (latest by date)

Memphis{dev} is an open-source real-time data processing platform
that provides end-to-end support for in-app streaming use cases using Memphis distributed message broker.
Memphis' platform requires zero ops, enables rapid development, extreme cost reduction,
eliminates coding barriers, and saves a great amount of dev time for data-oriented developers and data engineers.

Installation

After installing and running memphis broker,
In your project's directory:

go get github.com/memphisdev/memphis.go

Importing

import "github.com/memphisdev/memphis.go"

Connecting to Memphis

c, err := memphis.Connect("<memphis-host>", 
	"<application type username>", 
	"<broker-token>")

It is possible to pass connection configuration parameters, as function-parameters.
// function params
c, err := memphis.Connect("<memphis-host>", 
	"<application type username>", 
	"<broker-token>",
	Port(<int>),        
	Reconnect(<bool>),
	MaxReconnect(<int>)
	)

Once connected, all features offered by Memphis are available.

Disconnecting from Memphis

To disconnect from Memphis, call Close() on the Memphis connection object.

c.Close();

Creating a Station

Stations can be created from Conn
Passing optional parameters using functions

s0, err = c.CreateStation("<station-name>")

s1, err = c.CreateStation("<station-name>", 
 RetentionTypeOpt(<Messages/MaxMeMessageAgeSeconds/Bytes>),
 RetentionVal(<int>), 
 StorageTypeOpt(<Memory/Disk>), 
 Replicas(<int>), 
 IdempotencyWindow(<time.Duration>)) // defaults to 2 minutes

Retention Types

Memphis currently supports the following types of retention:

memphis.MaxMeMessageAgeSeconds

The above means that every message persists for the value set in the retention value field (in seconds).

memphis.Messages

The above means that after the maximum number of saved messages (set in retention value)
has been reached, the oldest messages will be deleted.

memphis.Bytes

The above means that after maximum number of saved bytes (set in retention value)
has been reached, the oldest messages will be deleted.

Storage Types

Memphis currently supports the following types of messages storage:

memphis.Disk

The above means that messages persist on disk.

memphis.Memory

The above means that messages persist on the main memory.

Destroying a Station

Destroying a station will remove all its resources (including producers and consumers).

err := s.Destroy();

Produce and Consume Messages

The most common client operations are producing messages and consuming messages.

Messages are published to a station and consumed from it
by creating a consumer and calling its Consume function with a message handler callback function.
Consumers are pull-based and consume all the messages in a station
unless you are using a consumers group,
in which case messages are spread across all members in this group.

Memphis messages are payload agnostic. Payloads are byte slices, i.e []byte.

In order to stop receiving messages, you have to call consumer.StopConsume().
The consumer will terminate regardless of whether there are messages in flight for the client.

Creating a Producer

// from a Conn
p0, err := c.CreateProducer(
	"<station-name>",
	"<producer-name>",
	memphis.ProducerGenUniqueSuffix()
) 

// from a Station
p1, err := s.CreateProducer("<producer-name>")

Producing a message

p.Produce("<message in []byte or protoreflect.ProtoMessage(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema)>", memphis.AckWaitSec(15)) // defaults to 15 seconds

Add headers

hdrs := memphis.Headers{}
hdrs.New()
err := hdrs.Add("key", "value")
p.Produce(
	"<message in []byte or protoreflect.ProtoMessage(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema)>",
    memphis.AckWaitSec(15),
	memphis.MsgHeaders(hdrs) // defaults to empty
)

Async produce

Meaning your application won't wait for broker acknowledgement - use only in case you are tolerant for data loss

p.Produce(
	"<message in []byte or protoreflect.ProtoMessage(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema)>",
    memphis.AckWaitSec(15),
	memphis.AsyncProduce()
)

Message ID

Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id

p.Produce(
	"<message in []byte>/protoreflect.ProtoMessage in case it is a schema validated station",
    memphis.AckWaitSec(15),
	memphis.MsgId("343")
)

Destroying a Producer

p.Destroy();

Creating a Consumer

// creation from a Station
consumer0, err = s.CreateConsumer("<consumer-name>",
  memphis.ConsumerGroup("<consumer-group>"), // defaults to consumer name
  memphis.PullInterval(<pull interval time.Duration), // defaults to 1 second
  memphis.BatchSize(<batch-size int), // defaults to 10
  memphis.BatchMaxWaitTime(<time.Duration>), // defaults to 5 seconds, has to be at least 1 ms
  memphis.MaxAckTime(<time.Duration>), // defaults to 30 sec
  memphis.MaxMsgDeliveries(<int>), // defaults to 10
  memphis.ConsumerGenUniqueSuffix(),
  memphis.ConsumerErrorHandler(func(*Consumer, error){})
)
  
// creation from a Conn
consumer1, err = c.CreateConsumer("<station-name>", "<consumer-name>", ...) 

Processing Messages

First, create a callback function that receives a slice of pointers to memphis.Msg and an error.

Then, pass this callback into consumer.Consume function.

The consumer will try to fetch messages every pullInterval (that was given in Consumer's creation) and call the defined message handler.

func handler(msgs []*memphis.Msg, err error) {
	if err != nil {
		m := msgs[0]
		fmt.Println(string(m.Data()))
		m.Ack()
	}
}

consumer.Consume(handler)

You can trigger a single fetch with the Fetch() method

msgs, err := consumer.Fetch()

Acknowledging a Message

Acknowledging a message indicates to the Memphis server to not
re-send the same message again to the same consumer or consumers group.

message.Ack();

Get headers

Get headers per message

headers := msg.GetHeaders()

Destroying a Consumer

consumer.Destroy();

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.