Coder Social home page Coder Social logo

Comments (19)

sunli829 avatar sunli829 commented on April 28, 2024 4

@atik7 https://github.com/async-graphql/examples/tree/c8219078a4b7aa6d84d22e9b79f033088897be4b/poem/subscription-redis

from async-graphql.

sunli829 avatar sunli829 commented on April 28, 2024 1

The problem has been solved.😁

from async-graphql.

sunli829 avatar sunli829 commented on April 28, 2024 1

Okay, I will provide it later today. @atik7

from async-graphql.

sunli829 avatar sunli829 commented on April 28, 2024

Global stream are easy to implement, and I'll add a component tomorrow to do just that.

Creating the stream should be an asynchronous function, so I'm going to do that first.

from async-graphql.

sunli829 avatar sunli829 commented on April 28, 2024

Are you Chinese?

from async-graphql.

nicolaiunrein avatar nicolaiunrein commented on April 28, 2024

Are you Chinese?

No German 😊
Why?

from async-graphql.

sunli829 avatar sunli829 commented on April 28, 2024

I thought we were in the same time zone.πŸ˜„

I added SimpeBroker and updated the example.
SimpleBroker should be used for standalone testing, and later I will provide support for Kafka, Redis, etc.

from async-graphql.

nicolaiunrein avatar nicolaiunrein commented on April 28, 2024

Yeah thought so too. πŸ˜† Are you from china?

from async-graphql.

sunli829 avatar sunli829 commented on April 28, 2024

Yes, I'm Chinese.😁

from async-graphql.

nicolaiunrein avatar nicolaiunrein commented on April 28, 2024

SimpleBroker panics at called `Option::unwrap()` on a `None` value if used with more than one type. Is this intended? If so this should be reflected in the docs/example.

from async-graphql.

sunli829 avatar sunli829 commented on April 28, 2024

I didn't have a test because I was going to sleep. That should be fine.

from async-graphql.

maz000 avatar maz000 commented on April 28, 2024

Can you provide a β€œRedis-based” (https://github.com/mitsuhiko/redis-rs) example of global stream for subscription?

from async-graphql.

lostb1t avatar lostb1t commented on April 28, 2024

I would be interested in an redis-rs example as well.

from async-graphql.

bponidev avatar bponidev commented on April 28, 2024

May be helpful...

publishing

let message = serde_json::to_string(&order)?;
let channel = format!("user_{}_order", driver.user_id);
let _ = caching::cache_publish(ctx, channel, message).await?;

and consuming

async fn logistics_order(
    &self,
    ctx: &Context<'_>,
    channel: String,
) -> impl Stream<Item = Order> {
    let context = ctx.data::<MainContext>().unwrap();
    let con = context.redis.get_async_connection().await.unwrap();
    let mut pubsub = con.into_pubsub();
    pubsub.subscribe(channel).await.unwrap();
    stream! {
    let mut msg_stream = pubsub.on_message();
        while let Some(msg) = msg_stream.next().await {
            let message: String = msg.get_payload().unwrap();
            let order: Order = serde_json::from_str(&message).unwrap();
            yield order;
        }
    }
}

from async-graphql.

lostb1t avatar lostb1t commented on April 28, 2024

Appreciated, actually just finished an similar implementation but using rust channels.

Because the above example creates an redis connection for each subscription. (You cannot share a pubsub channel accross threads)

So what i did was to fanout pubsub events to subscriptions with rust channels

from async-graphql.

bponidev avatar bponidev commented on April 28, 2024

Can u provide some code example, it would be greatly helpful for us.

from async-graphql.

lostb1t avatar lostb1t commented on April 28, 2024
...

lazy_static! {
  pub static ref EVENT_BUS: tokio::sync::broadcast::Sender<Event> = {
    let (tx, _) = tokio::sync::broadcast::channel(10000);
    tx
  };
}

...

#[Object]
impl Mutation {
  async fn emit_event(&self, ctx: &Context<'_>, input: Input) -> Result<Output> {
    let mut redis_conn = ctx
      .data_unchecked::<redis::aio::MultiplexedConnection>()
      .clone();
    let _: () = redis_conn
      .publish("events", serde_json::to_string(&input)?)
      .await?;

    Ok(input)
  }
}

...

#[Subscription]
impl Subscription {
  async fn receive_event(&self, ctx: &Context<'_>) -> impl Stream<Item = Event> {
    BroadcastStream::new(EVENT_BUS.subscribe())
      .filter(|msg| msg.is_ok())
      .map(|msg| msg.unwrap())
  }
}

....


#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
  let redis_url = env::var("REDIS_URL").expect("REDIS_URL is not set");
  let redis_client = redis::Client::open(redis_url).expect("client");
  let mut redis_pubsub = redis_client
    .clone()
    .get_tokio_connection()
    .await?
    .into_pubsub();
  let fanout_task = async move {
    redis_pubsub
      .subscribe("events")
      .await
      .unwrap();
    let mut msg_stream = redis_pubsub.into_on_message();
    while let Some(msg) = msg_stream.next().await {
      let msg: String = msg.get_payload().unwrap();
      let event: Event = serde_json::from_str(&msg).unwrap();
      let _ = schema::EVENT_BUS.send(event);
    }
  };
  // spawn the fanout task as a background task that will run forever.
  tokio::spawn(fanout_task);

  ...

}

from async-graphql.

atik7 avatar atik7 commented on April 28, 2024

@sunli829 Please add a redis example, thank you!

from async-graphql.

atik7 avatar atik7 commented on April 28, 2024

@sunli829 thank you :-)

from async-graphql.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.