Coder Social home page Coder Social logo

superstreamlabs / memphis.go Goto Github PK

View Code? Open in Web Editor NEW
38.0 4.0 16.0 533 KB

Go client for Memphis. Memphis is an event processing platform

Home Page: https://pkg.go.dev/github.com/memphisdev/memphis.go

License: Apache License 2.0

Go 100.00%
go golang message-broker message-queue sdk-go

memphis.go's Introduction

Github (4)

Discord Code Of Conduct GitHub release (latest by date)

cloud_native 2 (5)

Cloud - Docs - X - YouTube

Memphis.dev is a highly scalable, painless, and effortless data streaming platform.
Made to enable developers and data teams to collaborate and build
real-time and streaming apps fast.

Installation

After installing and running memphis broker,
In your project's directory:

go get github.com/memphisdev/memphis.go

Importing

import "github.com/memphisdev/memphis.go"

Connecting to Memphis

c, err := memphis.Connect("<memphis-host>", 
	"<application type username>", 
	memphis.ConnectionToken("<connection-token>"), // you will get it on application type user creation
	memphis.Password("<password>")) // depends on how Memphis deployed - default is connection token-based authentication

The connect function allows for the connection to Memphis. Connecting to Memphis (cloud or open-source) will be needed in order to use any of the other functionality of the Memphis class. Upon connection, all of Memphis' features are available.

Configuring the connection to Memphis in the Go SDK can be done by passing in the different configuration functions to the Connect function.

// function params
c, err := memphis.Connect("<memphis-host>", 
	"<application type username>", 
	memphis.ConnectionToken("<connection-token>"), // you will get it on application type user creation
	memphis.Password("<password>"), // depends on how Memphis deployed - default is connection token-based authentication
  	memphis.AccountId(<int>) // You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored
  	memphis.Port(<int>), // defaults to 6666       
	memphis.Reconnect(<bool>), // defaults to true
	memphis.MaxReconnect(<int>), // Set the maximum number of reconnection attempts. The default value is -1, which means unlimited reconnection attempts.
  	memphis.ReconnectInterval(<time.Duration>) // defaults to 1 second
  	memphis.Timeout(<time.Duration>) // defaults to 15 seconds
	// for TLS connection:
	memphis.Tls("<cert-client.pem>", "<key-client.pem>",  "<rootCA.pem>"),
	)

Here is an example of connecting to Memphis using a password (using the default user:root password:memphis login with Memphis open-source):

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

Connecting to Memphis cloud will require the account id and broker hostname. You may find these on the Overview page of the Memphis cloud UI at the top of the page. Here is an example of connecting to a cloud broker that is located in US East:

conn, err := memphis.Connect("aws-us-east-1.cloud.memphis.dev", "my_client_username", memphis.Password("memphis"), memphis.AccountId(123456789))

It is possible to use a token-based connection to memphis as well, where multiple users can share the same token to connect to memphis. Here is an example of using memphis.connect with a token:

conn, err := memphis.Connect("localhost", "root", memphis.ConnectionToken("memphis"))

The token will be made available when creating new users.

Memphis open-source needs to be configured to use token based connection. See the docs for help doing this.

To use a TLS based connection, the TLS function will need to be invoked:

func Tls(TlsCert string, TlsKey string, CaFile string) Option {
	return func(o *Options) error {
		o.TLSOpts = TLSOpts{
			TlsCert: TlsCert,
			TlsKey:  TlsKey,
			CaFile:  CaFile,
		}
		return nil
	}
}

Using this to connect to Memphis looks like this:

conn, err := memphis.Connect("localhost", "root", memphis.Tls(
    "~/tls_file_path.key",
    "~/tls_cert_file_path.crt",
    "~/tls_cert_file_path.crt",
))

To configure memphis to use TLS see the docs.

Disconnecting from Memphis

To disconnect from Memphis, call Close() on the Memphis connection object.

c.Close();

Creating a Station

Stations are distributed units that store messages. Producers add messages to stations and Consumers take messages from them. Each station stores messages until their retention policy causes them to either delete the messages or move them to remote storage.

A station will be automatically created for the user when a consumer or producer is used if no stations with the given station name exist.

Stations can be created from a memphis connection (Conn)
Passing optional parameters using functions
If the station trying to be created exists when this function is called, nothing will change with the exisitng station

s0, err = c.CreateStation("<station-name>")

s1, err = c.CreateStation("<station-name>", 
 memphis.RetentionTypeOpt(<Messages/MaxMessageAgeSeconds/Bytes/AckBased>), // AckBased - cloud only
 memphis.RetentionVal(<int>), // defaults to 3600
 memphis.StorageTypeOpt(<Memory/Disk>), 
 memphis.Replicas(<int>), 
 memphis.IdempotencyWindow(<time.Duration>), // defaults to 2 minutes
 memphis.SchemaName(<string>),
 memphis.SendPoisonMsgToDls(<bool>), // defaults to true
 memphis.SendSchemaFailedMsgToDls(<bool>), // defaults to true
 memphis.TieredStorageEnabled(<bool>), // defaults to false
 memphis.PartitionsNumber(<int>), // default is 1 partition
 memphis.DlsStation(<string>) // defaults to "" (no DLS station) - If selected DLS events will be sent to selected station as well
)

The CreateStation function is used to create a station. Using the different arguemnts, one can programically create many different types of stations. The Memphis UI can also be used to create stations to the same effect.

A minimal example, using all default values would simply create a station with the given name:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

station, err := conn.CreateStation("myStation")

To change what criteria the station uses to decide if a message should be retained in the station, change the retention type. The different types of retention are documented here in the go README.

The unit of the rentention value will vary depending on the RetentionType. The previous link also describes what units will be used.

Here is an example of a station which will only hold up to 10 messages:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

station, err := conn.CreateStation(
    "myStation",
    memphis.RetentionTypeOpt(memphis.Messages),
    memphis.RetentionVal(10)
    )

Memphis stations can either store Messages on disk or in memory. A comparison of those types of storage can be found here.

Here is an example of how to create a station that uses Memory as its storage type:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

station, err := conn.CreateStation(
    "myStation",
    memphis.StorageTypeOpt(memphis.Memory)
    )

In order to make a station more redundant, replicas can be used. Read more about replicas here. Note that replicas are only available in cluster mode. Cluster mode can be enabled in the Helm settings when deploying Memphis with Kubernetes.

Here is an example of creating a station with 3 replicas:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

station, err := conn.CreateStation(
    "myStation",
    memphis.Replicas(3)
    )

Idempotency defines how Memphis will prevent duplicate messages from being stored or consumed. The duration of time the message ID's will be stored in the station can be set with the IdempotencyWindow StationOpt. If the environment Memphis is deployed in has unreliably connection and/or a lot of latency, increasing this value might be desiriable. The default duration of time is set to two minutes. Read more about idempotency here.

Here is an example of changing the idempotency window to 3 seconds:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

station, err := conn.CreateStation(
    "myStation",
    memphis.IdempotencyWindow(3 * time.Minute)
    )

The SchemaName is used to set a schema to be enforced by the station. The default value ensures that no schema is enforced. Here is an example of changing the schema to a defined schema in schemaverse called "sensorLogs":

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

station, err := conn.CreateStation(
    "myStation",
    memphis.SchemaName("sensorLogs")
    )

There are two parameters for sending messages to the dead-letter station(DLS). Use the functions SendPoisonMsgToDls and SendSchemaFailedMsgToDls to se these parameters.

Here is an example of sending poison messages to the DLS but not messages which fail to conform to the given schema.

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

station, err := conn.CreateStation(
    "myStation",
    memphis.SchemaName("SensorLogs"),
    memphis.SendPoisonMsgToDls(true),
    memphis.SendSchemaFailedMsgToDls(false)
    )

When either of the DLS flags are set to True, a station can also be set to handle these events. To set a station as the station to where schema failed or poison messages will be set to, use the DlsStation StationOpt:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

station, err := conn.CreateStation(
    "myStation",
    memphis.SchemaName("SensorLogs"),
    memphis.SendPoisonMsgToDls(true),
    memphis.SendSchemaFailedMsgToDls(false),
    memphis.DlsStation("badSensorMessagesStation")
    )

When the retention value is met, Mempihs by default will delete old messages. If tiered storage is setup, Memphis can instead move messages to tier 2 storage. Read more about tiered storage here. Enable this setting with the respective StationOpt:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

station, err := conn.CreateStation(
    "myStation",
    memphis.TieredStorageEnabled(true)
    )

Partitioning might be useful for a station. To have a station partitioned, simply set the PartitionNumber StationOpt:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

station, err := conn.CreateStation(
    "myStation",
    memphis.PartitionsNumber(3)
    )

Retention Types

Retention types define the methodology behind how a station behaves with its messages. Memphis currently supports the following retention types:

memphis.MaxMessageAgeSeconds

When the retention type is set to MAX_MESSAGE_AGE_SECONDS, messages will persist in the station for the number of seconds specified in the retention_value.

memphis.Messages

When the retention type is set to MESSAGES, the station will only hold up to retention_value messages. The station will delete the oldest messsages to maintain a retention_value number of messages.

memphis.Bytes

When the retention type is set to BYTES, the station will only hold up to retention_value BYTES. The oldest messages will be deleted in order to maintain at maximum retention_vlaue BYTES in the station.

memphis.AckBased // for cloud users only

When the retention type is set to ACK_BASED, messages in the station will be deleted after they are acked by all subscribed consumer groups.

Retention Values

The unit of the retention value changes depending on the retention type specified.

All retention values are of type int. The following units are used based on the respective retention type:

memphis.MaxMessageAgeSeconds is represented in seconds,
memphis.Messages is a number of messages
memphis.Bytes is a number of bytes,
With memphis.AckBased, the retentionValue is ignored.

Storage Types

Memphis currently supports the following types of messages storage:

memphis.Disk

When storage is set to DISK, messages are stored on disk.

memphis.Memory

When storage is set to MEMORY, messages are stored in the system memory (RAM).

Destroying a Station

Destroying a station will remove all its resources (including producers and consumers).

err := s.Destroy();

Creating a new Schema

In case schema is already exist a new version will be created

err := conn.CreateSchema("<schema-name>", "<schema-type>", "<schema-file-path>")

Enforcing a Schema on an Existing Station

err := conn.EnforceSchema("<schema-name>", "<station-name>")

Deprecated - Attaching Schema

use EnforceSchema instead

err := conn.AttachSchema("<schema-name>", "<station-name>")

Detaching a Schema from Station

err := conn.DetachSchema("<station-name>")

Produce and Consume Messages

The most common client operations are producing messages and consuming messages.

Messages are published to a station with a Producer and consumed from it by a Consumer by creating a consumer and calling its Consume function with a message handler callback function.

Alternatively, consumers may call the Fetch function to only consume a specific number of messages.

Consumers are pull-based and consume all the messages in a station unless you are using a consumers group, in which case messages are spread across all members in this group.

Memphis messages are payload agnostic. Payloads are byte slices, i.e []byte.

In order to stop receiving messages, you have to call consumer.StopConsume().

The consumer will terminate even if there are messages currently being sent to the consumer.

Creating a Producer

// from a Conn
p0, err := c.CreateProducer(
	"<station-name>",
	"<producer-name>",
) 

// from a Station
p1, err := s.CreateProducer("<producer-name>")

Producing a message

Both producers and connections can use the produce function. To produce a message from a connection, simply call connection.Produce. This function will create a producer if none with the given name exists, otherwise it will pull the producer from a cache and use it to produce the message.

Here is an example of producing from a connection: (receiver function of the connection struct).

c.Produce("station_name_c_produce", "producer_name_a", []byte("Hey There!"), []memphis.ProducerOpt{}, []memphis.ProduceOpt{})

Here is an example of producing from a producer (p) (receiver function of the producer struct).

Creating a producer and calling produce on it will increase the performance of producing messages as it reduces the latency of having to get a producer from the cache.

p.Produce("<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>", memphis.AckWaitSec(15)) // defaults to 15 seconds

Note: When producing a message using avro format([]byte or map[string]interface{}), int types are converted to float64. Type conversion of Golang float64 equals Avro double. So when creating an avro schema, it can't have int types. use double instead. E.g.

myData :=  map[string]interface{}{
"username": "John",
"age": 30
}
{
	"type": "record",
	"namespace": "com.example",
	"name": "test_schema",
	"fields": [
		{ "name": "username", "type": "string" },
		{ "name": "age", "type": "double" }
	]
}

Note: When producing to a station with more than one partition, the producer will produce messages in a Round Robin fashion between the different partitions.

For message data formats see here.

Here is an example of a produce function call that waits up to 30 seconds for an acknowledgement from memphis:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

producer, err := conn.CreateProducer(
    "StationToProduceFor",
    "MyNewProducer",
)

// Handle err

err = producer.Produce(
    []byte("My Message :)"),
    memphis.AckWaitSec(30),
)

// Handle err

As discussed before in the station section, idempotency is an important feature of memphis. To achieve idempotency, an id must be assigned to messages that are being produced. Use the MsgId ProducerOpt for this purpose.

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

producer, err := conn.CreateProducer(
    "StationToProduceFor",
    "MyNewProducer",
    // MsgID not supported yet...
)

// Handle err

err = producer.Produce(
    []byte("My Message :)"),
)

// Handle err

To add message headers to the message, use the headers parameter. Headers can help with observability when using certain 3rd party to help monitor the behavior of memphis. See here for more details.

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

producer, err := conn.CreateProducer(
    "StationToProduceFor",
    "MyNewProducer",
)

// Handle err

hdrs := memphis.Headers{}
hdrs.New()
err := hdrs.Add("key", "value")

// Handle err

err = producer.Produce(
    []byte("My Message :)"),
    memphis.MsgHeaders(hdrs),
)

// Handle err

Lastly, memphis can produce to a specific partition in a station. To do so, use the ProducerPartitionKey ProducerOpt:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

producer, err := conn.CreateProducer(
    "StationToProduceFor",
    "MyNewProducer",
)

// Handle err

err = producer.Produce(
    []byte("My Message :)"),
    memphis.ProducerPartitionKey("2ndPartition"),
)

// Handle err

Async produce

For better performance. The client won't wait while waiting for an acknowledgment before sending more messages.

p.Produce(
	"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>",
    memphis.AckWaitSec(15),
	memphis.AsyncProduce()
)

Sync produce

For better reliability. The client will wait for an acknowledgement from the broker before sending another message.

p.Produce(
	"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>",
    memphis.AckWaitSec(15),
	memphis.SyncProduce()
)

Produce using partition number

The partition number will be used to produce messages to a spacific partition.

p.Produce(
	"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>",
    memphis.ProducerPartitionNumber(<int>)
)

Produce to multiple stations

Producing to multiple stations can be done by creating a producer with multiple stations and then calling produce on that producer.

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

producer, err := conn.CreateProducer(
    []string{"station1", "station2", "station3"},
    "MyNewProducer",
)

// Handle err

err = producer.Produce(
    []byte("My Message :)"),
    memphis.AckWaitSec(30),
)

// Handle err

In this example, the producer sends a message to three different stations: station1, station2, and station3. Alternatively, it also possible to produce to multiple stations using the connection:

conn.Produce([]string{"station1", "station2", "station3"}, "producer_name_a", []byte("Hey There!"), []memphis.ProducerOpt{}, []memphis.ProduceOpt{})

Destroying a Producer

p.Destroy();

Creating a Consumer

// creation from a Station
consumer0, err = s.CreateConsumer("<consumer-name>",
  memphis.ConsumerGroup("<consumer-group>"), // defaults to consumer name
  memphis.PullInterval(<pull interval time.Duration), // defaults to 1 second
  memphis.BatchSize(<batch-size> int), // defaults to 10
  memphis.BatchMaxWaitTime(<time.Duration>), // defaults to 5 seconds, has to be at least 1 ms
  memphis.MaxAckTime(<time.Duration>), // defaults to 30 sec
  memphis.MaxMsgDeliveries(<int>), // defaults to 2
  memphis.ConsumerErrorHandler(func(*Consumer, error){})
  memphis.StartConsumeFromSeq(<uint64>)// start consuming from a specific sequence. defaults to 1
  memphis.LastMessages(<int64>)// consume the last N messages, defaults to -1 (all messages in the station)
)

// creation from a Conn
consumer1, err = c.CreateConsumer("<station-name>", "<consumer-name>", ...) 

Consumers are used to pull messages from a station. Here is how to create a consumer with all of the default parameters:

Note: When consuming from a station with more than one partition, the consumer will consume messages in Round Robin fashion from the different partitions.

To create a consumer in a consumer group, add the ConsumerGroup parameter:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

consumer, err := conn.CreateConsumer(
    "MyStation",
    "MyNewConsumer",
    memphis.ConsumerGroup("ConsumerGroup1"),
)

// Handle err

When using the Consume function from a consumer, the consumer will continue to consume in an infinite loop. To change the rate at which the consumer polls, change the PullInterval consumer option:

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

consumer, err := conn.CreateConsumer(
    "MyStation",
    "MyNewConsumer",
    memphis.PullInterval(2 * time.Second),
)

// Handle err

Every time the consumer pulls from the station, the consumer will try to take BatchSize number of elements from the station. However, sometimes there are not enough messages in the station for the consumer to consume a full batch. In this case, the consumer will continue to wait until either BatchSize messages are gathered or the time in milliseconds specified by BatchMaxWaitTime is reached.

Here is an example of a consumer that will try to pull 100 messages every 10 seconds while waiting up to 15 seconds for all messages to reach the consumer.

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

consumer, err := conn.CreateConsumer(
    "MyStation",
    "MyNewConsumer",
    memphis.PullInterval(10 * time.Second),
    memphis.BatchSize(100),
    memphis.BatchMaxWaitTime(15 * time.Second).
)

// Handle err

The MaxMsgDeliveries ConsumerOpt allows the user to set how many messages the consumer is able to consume (without acknowledging) before consuming more.

conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis"))

// Handle err

consumer, err := conn.CreateConsumer(
    "MyStation",
    "MyNewConsumer",
    memphis.PullInterval(10 * time.Second),
    memphis.BatchSize(100),
    memphis.BatchMaxWaitTime(15 * time.Second),
    memphis.MaxMsgDeliveries(100),
)

// Handle err

Passing a context to a message handler

ctx := context.Background()
ctx = context.WithValue(ctx, "key", "value")
consumer.SetContext(ctx)

Processing Messages

First, create a callback function that receives a slice of pointers to memphis.Msg and an error.

Then, pass this callback into consumer.Consume function.

The consumer will try to fetch messages every pullInterval (that was given in Consumer's creation) and call the defined message handler.

func handler(msgs []*memphis.Msg, err error, ctx context.Context) {
	if err != nil {
		fmt.Printf("Fetch failed: %v", err)
		return
	}

	for _, msg := range msgs {
		fmt.Println(string(msg.Data()))
		msg.Ack()
	}
}

consumer.Consume(handler, 
				memphis.ConsumerPartitionKey(<string>) // use the partition key to consume from a spacific partition (if not specified consume in a Round Robin fashion)
)

consumer.Consume(handler, 
				memphis.ConsumerPartitionNumber(<string>) 
)

Consumer schema deserialization

To get messages deserialized, use msg.DataDeserialized().

func handler(msgs []*memphis.Msg, err error, ctx context.Context) {
	if err != nil {
		fmt.Printf("Fetch failed: %v", err)
		return
	}

	for _, msg := range msgs {
		fmt.Println(string(msg.DataDeserialized()))
		msg.Ack()
	}
}

There may be some instances where you apply a schema after a station has received some messages. In order to consume those messages get_data_deserialized may be used to consume the messages without trying to apply the schema to them. As an example, if you produced a string to a station and then attached a protobuf schema, using get_data_deserialized will not try to deserialize the string as a protobuf-formatted message.

Fetch a single batch of messages

msgs, err := conn.FetchMessages("<station-name>", "<consumer-name>",
  memphis.FetchBatchSize(<int>) // defaults to 10
  memphis.FetchConsumerGroup("<consumer-group>"), // defaults to consumer name
  memphis.FetchBatchMaxWaitTime(<time.Duration>), // defaults to 100 millis, has to be at least 100 ms
  memphis.FetchMaxAckTime(<time.Duration>), // defaults to 10 sec
  memphis.FetchMaxMsgDeliveries(<int>), // defaults to 2
  memphis.FetchConsumerErrorHandler(func(*Consumer, error){})
  memphis.FetchStartConsumeFromSeq(<uint64>)// start consuming from a specific sequence. defaults to 1
  memphis.FetchLastMessages(<int64>)// consume the last N messages, defaults to -1 (all messages in the station))
  memphis.FetchPartitionKey(<string>)// use the partition key to consume from a spacific partition (if not specified consume in a Round Robin fashion)
)

Fetch a single batch of messages after creating a consumer

prefetch = true will prefetch next batch of messages and save it in memory for future Fetch() request
Note: Use a higher MaxAckTime as the messages will sit in a local cache for some time before being processed and Ack'd.

msgs, err := consumer.Fetch(<batch-size> int,
							<prefetch> bool,
							memphis.ConsumerPartitionKey(<string>) // use the partition key to consume from a spacific partition (if not specified consume in a Round Robin fashion)
							)

Acknowledging a Message

Acknowledging a message indicates to the Memphis server to not
re-send the same message again to the same consumer or consumers group.

message.Ack();

Nacking a Message

Mark the message as not acknowledged - the broker will resend the message immediately to the same consumers group, instead of waiting to the max ack time configured.

message.Nack();

Sending a message to the dead-letter

Sending the message to the dead-letter station (DLS) - the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason. The message will still be available to other consumer groups

message.DeadLetter("reason");

Delay the message after a given duration

Delay the message and tell Memphis server to re-send the same message again to the same consumer group.
The message will be redelivered only in case Consumer.MaxMsgDeliveries is not reached yet.

message.Delay(<time.Duration>);

Get headers

Get headers per message

headers := msg.GetHeaders()

Get message sequence number

Get message sequence number

sequenceNumber, err := msg.GetSequenceNumber()

Get message time sent

Get message time sent

timeSent, err := msg.GetTimeSent()

Destroying a Consumer

consumer.Destroy();

Check if broker is connected

conn.IsConnected()

memphis.go's People

Contributors

aksh-02 avatar avitaltrifsik avatar bazen-teklehaymanot avatar bgido avatar big-vi avatar bkochauri-memphis avatar brunobandev avatar daniel-davidd avatar devpahuja avatar elchinmemphis avatar idanasulin2706 avatar idonaaman123 avatar john-memphis avatar ormemphis avatar shay23b avatar shohamroditimemphis avatar talg123 avatar valerabr avatar yanivbh1 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

memphis.go's Issues

Reconnection flow test

the Conn object holds two connections, one to NATS and the other to Memphis,
The NATS reconnection logic is already implemented by NATS, however the connection with Memphis is TCP based and the SDK supposed to manage the reconnection logic.

We need to test the latter.
Example scenario:
bring up Memphis, connect with the SDK with reconnection interval of X with 3 reconnections, take Memphis down for X < interval < 2X => SDK should reconnect successfully

Consumer timeout error

As in the title, I receive an error each pullInterval time:
image

My code:


func FunFactConsumer(ctx context.Context) {
    conn, err := memphis.Connect(config.Memphis().Host, config.Memphis().Username, memphis.Password(config.Memphis().Password), memphis.Timeout(time.Second * 30))
    if err != nil {
        zap.S().Fatal(err)
    }
    defer conn.Close()

    consumer, err := conn.CreateConsumer("fun-facts", "gfun", memphis.PullInterval(15*time.Second))

    if err != nil {
        zap.S().Fatal(err)
    }

    handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
        if err != nil {
            zap.S().Error(err)
            return
        }

        for _, msg := range msgs {
			zap.S().Info("Memphis message: " + string(msg.Data()))
            msg.Ack()
			zap.S().Infof("Memphis headers: %+v", msg.GetHeaders())
        }
    }

	ctx = context.WithValue(ctx, "key", "value")
	consumer.SetContext(ctx)
    consumer.Consume(handler)

    <-ctx.Done()

	zap.S().Info("Stopping Memphis consumer")
	consumer.StopConsume()
}

Testing framework

Currently we use only Go's testing, but we rely on running a docker container that runs Memphis in the background.

we need to either mock Memphis or introduce a testing framework that allows us to control the container.

Consumer stops working on connection interruption

I was just testing what happens when the connection to the broker is interrupted and it turns out that the consumer is not able to continue to work.
In the web ui it still shows the consumer being (re)connected, but it does not consumer anymore, so i guess the go routine dies somewhere in the background.

Do i need to handle this case somehow on my own?

I just tested with simple stop and re-start of kubectl port-forward services/memphis-cluster 6666:6666

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/memphisdev/memphis.go"
)

const (
	station = "test"
)

var (
	groupName  string
	isConsumer bool
)

func init() {
	flag.StringVar(&groupName, "group", "group", "grou pname")
	flag.BoolVar(&isConsumer, "consumer", false, "as consumer")
	flag.Parse()

}

func main() {

	log.Print("connect")
	conn, err := memphis.Connect("localhost", "app", "token")
	if err != nil {
		os.Exit(1)
	}
	defer conn.Close()

	st, err := conn.CreateStation(station,
		memphis.RetentionVal(600),
		memphis.Replicas(1),
		memphis.SendPoisonMsgToDls(true), // defaults to true
	)
	if err != nil {
		os.Exit(1)
	}
	kill := make(chan os.Signal, 1)
	signal.Notify(kill, syscall.SIGINT, syscall.SIGABRT, syscall.SIGTERM)

	if isConsumer {
		consumer(st)
		
	} else {
		producer(st)
	}

	<-kill
}

func consumer(conn *memphis.Station) {
	consumer, err := conn.CreateConsumer(groupName, memphis.ConsumerGenUniqueSuffix(), memphis.PullInterval(1*time.Second))

	if err != nil {
		fmt.Printf("Consumer creation failed: %v", err)
		os.Exit(1)
	}

	handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
		if err != nil {
			fmt.Printf("Fetch failed: %v", err)
			return
		}

		for _, msg := range msgs {
			fmt.Println("data:" + string(msg.Data()))
			msg.Ack()
			headers := msg.GetHeaders()
			fmt.Println(headers)
		}
	}

	ctx := context.Background()
	ctx = context.WithValue(ctx, "key", "value")
	consumer.SetContext(ctx)
	log.Println(consumer.Consume(handler))
}

func producer(conn *memphis.Station) {
	producer, err := conn.CreateProducer(groupName, memphis.ProducerGenUniqueSuffix())

	if err != nil {
		fmt.Printf("Producer creation failed: %v", err)
		os.Exit(1)
	}

	ticker := time.NewTicker(time.Second)
	done := make(chan bool)
	go func() {
		for {
			select {
			case <-done:
				return
			case <-ticker.C:
				log.Println("generate message")

				log.Println(producer.Produce([]byte("msg" + time.Now().String())))

			}
		}
	}()

	kill := make(chan os.Signal, 1)
	signal.Notify(kill, syscall.SIGINT, syscall.SIGABRT, syscall.SIGTERM)
	<-kill
	ticker.Stop()

}

[Bug] go build ./... failed

go build ./... failed with errors:

examples/producer.go:10:6: main redeclared in this block
        examples/consumer.go:12:6: other declaration of main

Wrong folders organization

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.