Coder Social home page Coder Social logo

avitaltrifsik / memphis.py Goto Github PK

View Code? Open in Web Editor NEW

This project forked from superstreamlabs/memphis.py

0.0 0.0 0.0 299 KB

Python client for Memphis. Memphis is a Real-Time Data Processing Platform

Home Page: https://pypi.org/project/memphis-py/

License: Apache License 2.0

Python 100.00%

memphis.py's Introduction

Memphis light logo

Memphis light logo

Simple as RabbitMQ, Robust as Apache Kafka, and Perfect for microservices.

Memphis UI

CNCF Silver Member

CNCF Silver Member

Sandbox - Docs - Twitter - YouTube

Discord Code Of Conduct GitHub release (latest by date)

Memphis{dev} is an open-source real-time data processing platform
that provides end-to-end support for in-app streaming use cases using Memphis distributed message broker.
Memphis' platform requires zero ops, enables rapid development, extreme cost reduction,
eliminates coding barriers, and saves a great amount of dev time for data-oriented developers and data engineers.

Installation

$ pip3 install memphis-py

Importing

from memphis import Memphis, Headers
from memphis import retention_types, storage_types

Connecting to Memphis

First, we need to create Memphis object and then connect with Memphis by using memphis.connect.

async def main():
  try:
    memphis = Memphis()
    await memphis.connect(
      host="<memphis-host>",
      username="<application-type username>",
      connection_token="<broker-token>",
      port="<port>", # defaults to 6666
      reconnect=True, # defaults to True
      max_reconnect=3, # defaults to 3
      reconnect_interval_ms=1500, # defaults to 1500
      timeout_ms=1500, # defaults to 1500
      # for TLS connection:
      key_file='<key-client.pem>', 
      cert_file='<cert-client.pem>', 
      ca_file='<rootCA.pem>'
      )
    ...
  except Exception as e:
    print(e)
  finally:
    await memphis.close()

if __name__ == '__main__':
  asyncio.run(main())

Once connected, the entire functionalities offered by Memphis are available.

Disconnecting from Memphis

To disconnect from Memphis, call close() on the memphis object.

await memphis.close()

Creating a Station

If a station already exists nothing happens, the new configuration will not be applied

station = memphis.station(
  name="<station-name>",
  schema_name="<schema-name>",
  retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES. Defaults to MAX_MESSAGE_AGE_SECONDS
  retention_value=604800, # defaults to 604800
  storage_type=storage_types.DISK, # storage_types.DISK/storage_types.MEMORY. Defaults to DISK
  replicas=1, # defaults to 1
  idempotency_window_ms=120000, # defaults to 2 minutes
  send_poison_msg_to_dls=True, # defaults to true
  send_schema_failed_msg_to_dls=True # defaults to true
)

Retention types

Memphis currently supports the following types of retention:

memphis.retention_types.MAX_MESSAGE_AGE_SECONDS

Means that every message persists for the value set in retention value field (in seconds)

memphis.retention_types.MESSAGES

Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted

memphis.retention_types.BYTES

Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted

Storage types

Memphis currently supports the following types of messages storage:

memphis.storage_types.DISK

Means that messages persist on disk

memphis.storage_types.MEMORY

Means that messages persist on the main memory

Destroying a Station

Destroying a station will remove all its resources (producers/consumers)

station.destroy()

Attaching a Schema to an Existing Station

await memphis.attach_schema("<schema-name>", "<station-name>")

Detaching a Schema from Station

await memphis.detach_schema("<station-name>")

Produce and Consume messages

The most common client operations are produce to send messages and consume to receive messages.

Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.

Memphis messages are payload agnostic. Payloads are bytearray.

In order to stop getting messages, you have to call consumer.destroy(). Destroy will terminate regardless of whether there are messages in flight for the client.

Creating a Producer

producer = await memphis.producer(station_name="<station-name>", producer_name="<producer-name>", generate_random_suffix=False)

Producing a message

await prod.produce(
  message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
  ack_wait_sec=15) # defaults to 15

Add headers

headers= Headers()
headers.add("key", "value")
await producer.produce(
  message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
  headers=headers) # default to {}

Async produce

Meaning your application won't wait for broker acknowledgement - use only in case you are tolerant for data loss

await producer.produce(
  message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
  headers={}, async_produce=True)

Message ID

Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id

await producer.produce(
  message='bytearray/protobuf class/dict', # bytes / protobuf class (schema validated station - protobuf) or bytes/dict (schema validated station - json schema)
  headers={}, 
  async_produce=True,
  msg_id="123")

Destroying a Producer

producer.destroy()

Creating a Consumer

consumer = await memphis.consumer(
  station_name="<station-name>",
  consumer_name="<consumer-name>",
  consumer_group="<group-name>", # defaults to the consumer name
  pull_interval_ms=1000, # defaults to 1000
  batch_size=10, # defaults to 10
  batch_max_time_to_wait_ms=5000, # defaults to 5000
  max_ack_time_ms=30000, # defaults to 30000
  max_msg_deliveries=10, # defaults to 10
  generate_random_suffix=False
  start_consume_from_sequence=1 # start consuming from a specific sequence. defaults to 1
  last_messages=-1 # consume the last N messages, defaults to -1 (all messages in the station)
)

Processing messages

Once all the messages in the station were consumed the msg_handler will receive error: Memphis: TimeoutError.

async def msg_handler(msgs, error):
  for msg in msgs:
    print("message: ", msg.get_data())
    await msg.ack()
  if error:
    print(error)
consumer.consume(msg_handler)

Acknowledge a message

Acknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group

await message.ack()

Get headers

Get headers per message

``python headers = message.get_headers()


### Get message sequence number
Get message sequence number

```python
sequence_number = msg.get_sequence_number()

Destroying a Consumer

consumer.destroy()

memphis.py's People

Contributors

idanasulin2706 avatar shohamroditimemphis avatar svetamemphis avatar valerabr avatar shay23b avatar yanivbh1 avatar adarsh-jaiss avatar elchinmemphis avatar ormemphis avatar rahulmistri1997 avatar shpaz avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.