Coder Social home page Coder Social logo

sheharyarn / que Goto Github PK

View Code? Open in Web Editor NEW
663.0 16.0 32.0 166 KB

Simple Job Processing in Elixir with Mnesia :zap:

License: MIT License

Elixir 100.00%
elixir gen-server job-queue mnesia background-jobs in-memory hacktoberfest cloud-native kubernetes

que's Introduction

Que

Build Status Coverage Status Version Downloads License

Simple Background Job Processing in Elixir ⚡

Que is a job processing library backed by Mnesia, a distributed real-time database that comes with Erlang / Elixir. That means it doesn't depend on any external services like Redis for persisting job state. This makes it really easy to use since you don't need to install anything other than Que itself.

See the Documentation.


Installation

Add que to your project dependencies in mix.exs:

def deps do
  [{:que, "~> 0.10.1"}]
end

and then add it to your list of applications:

def application do
  [applications: [:que]]
end

Mnesia Setup

Que runs out of the box, but by default all jobs are stored in-memory. To persist jobs across application restarts, specify the DB path in your config.exs:

config :mnesia, dir: 'mnesia/#{Mix.env}/#{node()}'        # Notice the single quotes

And run the following mix task:

$ mix que.setup

This will create the Mnesia schema and job database for you. For a detailed guide, see the Mix Task Documentation. For compiled releases where Mix is not available see this.


Usage

Que is very similar to other job processing libraries such as Ku and Toniq. Start by defining a Worker with a perform/1 callback to process your jobs:

defmodule App.Workers.ImageConverter do
  use Que.Worker

  def perform(image) do
    ImageTool.save_resized_copy!(image, :thumbnail)
    ImageTool.save_resized_copy!(image, :medium)
  end
end

You can now add jobs to be processed by the worker:

Que.add(App.Workers.ImageConverter, some_image)
#=> {:ok, %Que.Job{...}}

Pattern Matching

The argument here can be any term from a Tuple to a Keyword List or a Struct. You can also pattern match and use guard clauses like any other method:

defmodule App.Workers.NotificationSender do
  use Que.Worker

  def perform(type: :like, to: user, count: count) do
    User.notify(user, "You have #{count} new likes on your posts")
  end

  def perform(type: :message, to: user, from: sender) do
    User.notify(user, "You received a new message from #{sender.name}")
  end

  def perform(to: user) do
    User.notify(user, "New activity on your profile")
  end
end

Concurrency

By default, all workers process one Job at a time, but you can customize that by passing the concurrency option:

defmodule App.Workers.SignupMailer do
  use Que.Worker, concurrency: 4

  def perform(email) do
    Mailer.send_email(to: email, message: "Thank you for signing up!")
  end
end

Job Success / Failure Callbacks

The worker can also export optional on_success/1 and on_failure/2 callbacks that handle appropriate cases.

defmodule App.Workers.ReportBuilder do
  use Que.Worker

  def perform({user, report}) do
    report.data
    |> PDFGenerator.generate!
    |> File.write!("reports/#{user.id}/report-#{report.id}.pdf")
  end

  def on_success({user, _}) do
    Mailer.send_email(to: user.email, subject: "Your Report is ready!")
  end

  def on_failure({user, report}, error) do
    Mailer.send_email(to: user.email, subject: "There was a problem generating your report")
    Logger.error("Could not generate report #{report.id}. Reason: #{inspect(error)}")
  end
end

Setup and Teardown

You can similarly export optional on_setup/1 and on_teardown/1 callbacks that are respectively run before and after the job is performed (successfully or not). But instead of the job arguments, they pass the job struct as an argument which holds a lot more internal details that can be useful for custom features such as logging, metrics, requeuing and more.

defmodule MyApp.Workers.VideoProcessor do
  use Que.Worker

  def on_setup(%Que.Job{} = job) do
    VideoMetrics.record(job.id, :start, process: job.pid, status: :starting)
  end

  def perform({user, video, options}) do
    User.notify(user, "Your video is processing, check back later.")
    FFMPEG.process(video.path, options)
  end

  def on_teardown(%Que.Job{} = job) do
    {user, video, _options} = job.arguments
    link = MyApp.Router.video_path(user.id, video.id)

    VideoMetrics.record(job.id, :end, status: job.status)
    User.notify(user, "We've finished processing your video. See the results.", link)
  end
end

Head over to Hexdocs for detailed Worker documentation.


Roadmap

  • Write Documentation
  • Write Tests
  • Persist Job State to Disk
    • Provide an API to interact with Jobs
  • Add Concurrency Support
    • Make jobs work in Parallel
    • Allow customizing the number of concurrent jobs
  • Success/Failure Callbacks
  • Find a more reliable replacement for Amnesia
  • Delayed Jobs
  • Allow job cancellation
  • Job Priority
  • Support running in a multi-node enviroment
    • Recover from node failures
  • Support for more Persistence Adapters
    • Redis
    • Postgres
  • Mix Task for creating Mnesia Database
  • Better Job Failures
    • Option to set timeout on workers
    • Add strategies to automatically retry failed jobs
  • Web UI

Contributing

  • Fork, Enhance, Send PR
  • Lock issues with any bugs or feature requests
  • Implement something from Roadmap
  • Spread the word ❤️

License

This package is available as open source under the terms of the MIT License.


que's People

Contributors

danxexe avatar sheharyarn avatar valpackett avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

que's Issues

Que failing with Distillery release and Docker

I have setup a Docker environment with Distillery to generate a release of my application.

My Application uses Que to add a worker like so

case Que.add(SyncWorker, %{}) do
      {:ok, _job} -> {:ok, "Syncing..."}
      err -> {:error, "Error syncing: #{err}"}
end

However when this line is called I get:

** (exit) exited in: GenServer.call({:global, {Que.Server, SolarisCards.ShopifyIntegration.Workers.SyncWorker}}, {:add_job, SolarisCards.ShopifyIntegration.Workers.SyncWorker, %{}}, 5000)
    ** (EXIT) an exception was raised:
        ** (UndefinedFunctionError) function ExUtils.Module.name/1 is undefined (module ExUtils.Module is not available)
            ExUtils.Module.name(SolarisCards.ShopifyIntegration.Workers.SyncWorker)
            (que) lib/que/server.ex:81: Que.Server.handle_call/3
            (stdlib) gen_server.erl:661: :gen_server.try_handle_call/4
            (stdlib) gen_server.erl:690: :gen_server.handle_msg/6
            (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:989: GenServer.call/3
    (solaris_cards) lib/solaris_cards/setup.ex:12: SolarisCards.Setup.do_setup_data/0

It works fine locally without docker and distillery, but not sure why its fails above.

I have also tried to run the command for precompiled application as described in the docs with no luck.

iex([email protected])1> Que.Persistence.Mnesia.setup!

The pattern 'false' can never match the type 'true'

Cheers everyone!
I'm having this Dialyzer warning just by implementing the perform/1 function:

The pattern 'false' can never match the type 'true'

There's also another warning:

The test  1 == 'infinity' can never evaluate to 'true'

I'm using VS Code with ElixiLS Fork and Elixir 1.9.4 compiled with OTP 20 (OTP 22 local)

image

perform completes before monitor is installed causing failure

If I have a trivial Worker i.e. it completes really fast, then sometimes it will fail, when really it is a success. Here's my worker:

  defmodule NormalWorker do
    use Que.Worker, concurrency: 4

    def perform(args) do
      IO.puts "perform "
    end
    def on_success(args) do
      IO.puts "success"
    end
    def on_failure(args, _err) do
      IO.puts "failure"
    end
  end

I think the problem is the worker process already completes before the Process.monitor is installed. In this case Erlang sends DOWN with info set to :noproc (and that's the error seen in failure too, since failure is handled by the second DOWN handling function in lib/que/server.ex

  def handle_info({:DOWN, ref, :process, _pid, :normal}, queue) do
  def handle_info({:DOWN, ref, :process, _pid, err}, queue) do

I fixed it by add this function (between the two above) and then handling it as success.
def handle_info({:DOWN, ref, :process, _pid, :noproc}, queue) do
Note the :noproc

http://erlang.org/doc/man/erlang.html#monitor-2

I'd submit a pull request, but I don't think this is a good way to fix it, because the Worker might have failed. Is it possible to somehow do the Process.monitor at the same time the Worker is spawned, like spawn_monitor?

Delayed Jobs

I see this is in your roadmap. Do you think this is something you'll have in the near term (few weeks) or is this a longer-term item?

Compilation issue on Elixir 1.7

== Compilation error in file lib/exquisite.ex ==
** (CompileError) lib/exquisite.ex:103: CALLER is available only inside defmacro and defmacrop
lib/exquisite.ex:103: (module)
could not compile dependency :exquisite, "mix compile" failed. You can recompile this dependency with "mix deps.compile exquisite", update it with "mix deps.update exquisite" or clean it with "mix deps.clean exquisite"

Elixir Version = Elixir v1.7.0-dev

Priority Queues

Related to #19.

  • Replace Erlang's :queue with a priority queue data structure
  • Update Mnesia Table definition
  • Auto-migrate on update
  • Update Que.add API to support options
  • Option validation
  • Tests
  • Docs and Readme

Merge Request - Noizu Fork

Hello sheharyarn, as mentioned previously I have a forked version of your library which adds support for priority levels, sharded workers (to avoid supervisor bottlenecks), node tenancy (jobs only run on the server they were created on or specified to run on), and dirty mnesia transactions using raw :mnesia module due to performance bottlenecks under high load.

I'd like to get things tidy enough to be able to merge upstread and avoid the need to maintain a separate fork. With that in mind I've made these features compile time configurable with the defaults behaving essentially like vanilla Que with the exception of the priority level changes and an error handler that treats :no_proc responses as success (which is negotiable) as they are generally caused by que jobs that exit too quickly.

However there is a breaking change to the table definition to include the priority level and node tenancy fields, and I am not sure if there is an elegant way to work around that with Memento or if I should further hide that change behind config options and modify the table structure only if tenancy and or priority levels are enabled.

When you have a chance give things a peak and let me know of any feedback or criticism you have that would need to be dealt with before making a merge request.

master...noizu:master

How to properly shutdown app with Mnesia backed queue

Hello @sheharyarn, I have an phoenix app that use a lot of Que Workers to process events data.
I has configured Mnesia to store the queue, every time that I need to restart the app I need to delete the mnesia tables and run Que.Persistence.Mnesia.setup! again.
Do yo know which procedure I must follow to stop Que properly.

Elixir 1.5.1
Phoenix 1.3.0
Que 0.4.1

Best regards.
Raul

When running que.setup: (Memento.MnesiaException) Mnesia operation failed

𝑓 1 mix que.setup
** (Memento.MnesiaException) Mnesia operation failed
   {'Bad type on some provided arguments', Que.Persistence.Mnesia.DB.Jobs, :disc_copies, :nonode@nohost}
   Mnesia Error: {:bad_type, Que.Persistence.Mnesia.DB.Jobs, :disc_copies, :nonode@nohost}
    lib/memento/table/table.ex:274: Memento.Table.handle_for_bang!/1
    (mix 1.10.3) lib/mix/task.ex:330: Mix.Task.run_task/3
    (mix 1.10.3) lib/mix/cli.ex:82: Mix.CLI.run_task/2
    (elixir 1.10.3) lib/code.ex:926: Code.require_file/2

Not really sure where this could be happening.

Can't compile que on Erlang 20.2 and Elixir 1.6rc1

Running on Windows: Erlang 20.2, Elixir 1.6rc1

... having compilation error see traceback below ... Any idea?

==> que
Compiling 12 files (.ex)
warning: Enum.partition/2 is deprecated, use Enum.split_with/2
  lib/que/server_supervisor.ex:75

warning: variable "worker" does not exist and is being expanded to "worker()", please use parentheses to remove the ambiguity or change the variable name
  lib/que/persistence/mnesia.ex:173

warning: variable "status" does not exist and is being expanded to "status()", please use parentheses to remove the ambiguity or change the variable name
  lib/que/persistence/mnesia.ex:173

warning: variable "status" does not exist and is being expanded to "status()", please use parentheses to remove the ambiguity or change the variable name
  lib/que/persistence/mnesia.ex:183

warning: variable "status" does not exist and is being expanded to "status()", please use parentheses to remove the ambiguity or change the variable name
  lib/que/persistence/mnesia.ex:183

warning: variable "worker" does not exist and is being expanded to "worker()", please use parentheses to remove the ambiguity or change the variable name
  lib/que/persistence/mnesia.ex:193

warning: variable "status" does not exist and is being expanded to "status()", please use parentheses to remove the ambiguity or change the variable name
  lib/que/persistence/mnesia.ex:193

warning: variable "status" does not exist and is being expanded to "status()", please use parentheses to remove the ambiguity or change the variable name
  lib/que/persistence/mnesia.ex:193

warning: variable "worker" does not exist and is being expanded to "worker()", please use parentheses to remove the ambiguity or change the variable name
  lib/que/persistence/mnesia.ex:213

warning: variable "status" does not exist and is being expanded to "status()", please use parentheses to remove the ambiguity or change the variable name
  lib/que/persistence/mnesia.ex:213


== Compilation error in file lib/que/persistence/mnesia.ex ==
** (CompileError) lib/que/persistence/mnesia.ex:173: undefined function status/0
    (stdlib) lists.erl:1338: :lists.foreach/2
    lib/que/persistence/mnesia.ex:131: (module)
could not compile dependency :que, "mix compile" failed. You can recompile this dependency with "mix deps.compile que", update it with "mix deps.update que" or clean it with "mix deps.clean que"

Using Que in a multi-node environment

We are trying to use Que to schedule jobs in a multi-node environment, allowing nodes to connect to each other and share mnesia information.

What I'm doing:

  1. Start iex console:
$ mix --sname node1 -S mix
$ mix --sname node2 -S mix
  1. Connect nodes (on node1 console)
iex> Node.connect(:node2@local_hostname)
  1. Check that they are connected with a Node.list() on both sides

  2. Spawn 100 jobs of my TestWorker(which puts a log message and waits one second)

for number <- 1..100 do
  Que.add(TestQue.TestWorker, number)
end

It works, but all jobs run only on one of the nodes (lets say node1), while on the other (node2) no job is executed.
Also, if I execute the same for on node2, all jobs are executed in node1 too.

is there anything we can do to:

  1. Run jobs on any node
  2. Ensure that if one node is down, the tasks in :mnesia remains in the rest of the cluster.

Thanks in advace!

PD: I have also tried executing the setup of mnesia persisted in disk, with this result:

iex(node1@patata)2> nodes = [node() | Node.list]
[:node1@patata, :node2@patata]
iex(node1@patata)3> Que.Persistence.Mnesia.setup!(nodes)
[info] Application mnesia exited: :stopped
** (Memento.MnesiaException) Mnesia operation failed
   :not_active
   Mnesia Error: {:not_active, Que.Persistence.Mnesia.DB.Jobs, :node2@patata}
    (memento) lib/memento/table/table.ex:274: Memento.Table.handle_for_bang!/1

setting :mnesia for Pow breaks non-persistent que

Is there a way to let que know to ignore the :mnesia :dir config that I set for a different process?

I had que working fine along with Pow auth library.

However, I just went to a multi-node setup, and set the :mnesia :dir for Pow, but I do not need to use persistence for que.

It was running fine yesterday after first time using mnesia, but today I get this:

 ** (EXIT) an exception was raised:
        ** (Memento.Error) Transaction Failed with: {:no_exists, Que.Persistence.Mnesia.DB.Jobs}
            (memento) lib/memento/transaction.ex:178: Memento.Transaction.handle_result/1
            (que) lib/que/server_supervisor.ex:72: Que.ServerSupervisor.resume_queued_jobs/0

If I delete the :mnesia :dir contents before starting, the error goes away

Inserting into queue is slow when queue is long

Inserting into the queue is slow when the queue is long. This is already recognizable when the queue has about 10,000 items.

This has mainly to do with the implementation of the queue. Adding items at the end of a list takes siginficant time (I think O(n^2)), because it has to reverse the list twice to update it.
By replacing the implementation with Erlang :queue the performance can be improved by about 100 times.
I ran a quick test with benchfella and got this result:

benchmark name iterations average time
Comparison stack 5000 648.01 µs/op
Erlang queue 1000 2301.37 µs/op
Que queue 10 188906.30 µs/op
Improved Que queue 500 3484.63 µs/op

Edit: This is the bench script

Worker is an Invalid Worker

I have an issue with the worker. I can start the application just once and it runs (in iex).
But when I want to rerun it (by starting a new iex session), I get the Worker is an Invalid Worker error.
I can only fix that by renaming the Worker. Then it works again once before I have to rename it again.

I am using Elixir 1.4.2 and Que 0.3.1

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.