Coder Social home page Coder Social logo

epocxy's People

Contributors

georgeye avatar hernanrivasacosta avatar jaynel avatar maximvl avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

epocxy's Issues

Add API for querying the age of an ets_buffer

Currently to get the oldest entry you must list the whole table using history_timestamped. When the buffer is large, this will crash the VM running out of memory. The following functions should be added:

  1. oldest entry (returns entry with timestamp)
  2. newest entry (returns entry with timestamp)
  3. oldest N entries
  4. some sort of histogram of date ranges?

question about `make_sup_name` in cxy_foun_sup

Hi,

I found that the name was rewritten when starting a Fount by this function. What do you think about removing it? When it's here it requires more work to retrieve the sup using its name. Thoughts?

Differentiate between 'write ring' and 'read ring' buffers

Ring buffers are great for concurrent writers because they have a natural cap on memory used and overwrite the oldest entries automatically.

Ring buffers for readers can also be great for tasks like load-balancing in a round-robin fashion by just continuously reading the next element and allowing wraparound on reads.

The current implementation deletes entries from LIFO / FIFO buffers when they are read, but not from RING buffers, but it only allows Write wraparound on RING buffers. A Read wraparound RING type needs to be introduced.

Add timing to ets_buffer entries

A common problem when working with buffers is how long entries remain in the buffer. Consider adding timestamps to buffer entries and statistics about how long entries persist.

Add deferred option for cxy_ctl

gen_stream used buffer striping that was driven by access. New concurrent process didn't execute until there was enough demand. This can be done programmatically by the user with maybe_execute_task.

Using a 'deferred' option so that up to a Max of spawns occurs, after which the spawning mechanism pauses until below Max before allowing the next to start. This involves maintaining Max processes running and a queue of deferred processes (the queue is the synchronization barrier).

Another option is to spawn as often as is needed, but to not start computation when over the Max. Processes could only proceed when given a signal to do so. This in itself is a form of synchronization.

The 2nd option allows for a more general implementation of synchronization barriers at the expense of more processes and more memory, but they are quicker to start running having already amortized the spawn cost.

Consider adding a high-water mark on ets_buffer / cxy_ctl

ets_buffer can be used to collect data from many writers, and periodical read and process the data. When the consumer falls behind the ets_buffer will grow in size. It would be nice to know the largest size since the high-water mark was last reset.

Same goes for the number of concurrent processes when using the concurrency limiter.

Concurrency bug?

Hi,

I just trying to write to a ring buffer with size 10.000 a 20.000 messages in one process and poll that buffer in a separate process. The unexpected behavior is that polled 30.000 messages while written 20.000 in total.

f(),
Cre = fun() -> Self = self(),
Poll = fun(Cont) ->
    case ets_buffer:read_dedicated(srb, 100 ) of
        [] -> 
            timer:sleep(50),
        Cont(Cont);
        Chunk when is_list(Chunk) -> 
        Self ! {chunk, Chunk},
            Cont(Cont);
        {missing_ets_data, _, _} ->
            erlang:yield(),
        Cont(Cont)    
end end,
spawn_link(fun()->  ets_buffer:create_dedicated(srb, ring, 10000), Poll(Poll) end) end.

[ets_buffer:write_dedicated(srb, {msg,N}) || N <- lists:seq(1,20000)].

false = ( 20000 >= length(lists:flatten([ receive {chunk, L} -> L end|| N<-lists:seq(1,element(2,erlang:process_info(self(), message_queue_len)))]))).

Improve cxy_ctl error messages

is it possible (cxy tools) to give a "more plain" structure for errors? because errors are thrown like: {error, {mfa_failure, …} and if the caller actually needs the error message, it would be useful to just use a simple pattern matching to get it

make edoc fails

make edoc with the following error:

[epocxy] make edoc                                                                                                                                                    16:46:59  ☁  master ☀
 GEN    distclean-edoc
 GEN    edoc
./src/cxy_cache.erl, function reserve/2: at line 113: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 110: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 109: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 108: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 106: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 105: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 103: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 100: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 99: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 98: multiple @spec tag.
edoc: skipping source file './src/cxy_cache.erl': {'EXIT',error}.
./src/cxy_cache_fsm.erl, function start_link/2: at line 51: multiple @spec tag.
./src/cxy_cache_fsm.erl, function start_link/2: at line 49: multiple @spec tag.
./src/cxy_cache_fsm.erl, function start_link/2: at line 48: multiple @spec tag.
edoc: skipping source file './src/cxy_cache_fsm.erl': {'EXIT',error}.
./src/cxy_cache_sup.erl, function start_link/0: at line 39: multiple @spec tag.
./src/cxy_cache_sup.erl, function start_link/0: at line 38: multiple @spec tag.
./src/cxy_cache_sup.erl, function start_link/0: at line 37: multiple @spec tag.
edoc: skipping source file './src/cxy_cache_sup.erl': {'EXIT',error}.
./src/cxy_ctl.erl, function update_spawn_times/5: at line 168: multiple @spec tag.
edoc: skipping source file './src/cxy_ctl.erl': {'EXIT',error}.
./src/cxy_fount.erl, function start_link/4: at line 178: multiple @spec tag.
edoc: skipping source file './src/cxy_fount.erl': {'EXIT',error}.
./src/cxy_fount_sup.erl, function start_link/2: at line 33: multiple @spec tag.
./src/cxy_fount_sup.erl, function start_link/2: at line 32: multiple @spec tag.
edoc: skipping source file './src/cxy_fount_sup.erl': {'EXIT',error}.
./src/cxy_regulator.erl, function start_link/0: at line 65: multiple @spec tag.
edoc: skipping source file './src/cxy_regulator.erl': {'EXIT',error}.
./src/cxy_synch.erl, function before_task/2: at line 41: multiple @spec tag.
edoc: skipping source file './src/cxy_synch.erl': {'EXIT',error}.
./src/ets_buffer.erl, function buffer_type/1: at line 123: multiple @spec tag.
edoc: skipping source file './src/ets_buffer.erl': {'EXIT',error}.
edoc: error in doclet 'edoc_doclet': {'EXIT',error}.
{"init terminating in do_boot",error}

Crash dump is being written to: erl_crash.dump...done
init terminating in do_boot (error)
make: *** [edoc] Error 1

Generational caching should be generalized as a behaviour

There are three currently requested features related to "generations":

  1. Generational caching (time based, access based, and injected functional calls)
  2. Windowed ets_buffer collection (accumulate transactions, batch in a window, clear)
  3. Uuid / random pre-generation (2 ets_buffers, fill one with pre-generated data, fill 2nd wait for empty trigger)
  4. High volume refresh (cycle between two buffers with updates displayed periodically)

These may or may not make sense for generalization, but the model in general is:

  1. Create 2 buffers / collections (like double-buffered animation)
  2. Trigger switching from one buffer to another (time, access count, injected event)
  3. Application atomically reads from currently active buffer

ets_buffer:create overwrites metadata without warning

If you attempt to create a new ets_buffer which already exists, the ets:new is silently skipped and then the metadata is reinitialized, wreaking havoc on any existing buffered data.

Create should refuse to execute if the ets table already exists.

Add "process geyser" pattern

A "process geyser" consists of pre-spawned processes that are intended to be used once. Rather than a worker pool where processes are checked out, returned and reused, the geyser is a fount of new processes always at the ready (hopefully). Stale processes in a worker pool can become fragmented, or have leftover bits in their process dictionaries.

The pattern works as follows:

  1. Create a FIFO queue
  2. Spawn N pages of M gen_fsm processes (option to hibernate or not)
    a) Place each process in the FIFO queue
    b) Every process starts in the ready_to_execute state
    c) Every Mth process starts in the spawn_more state
    - spawns a process which spawns M more processes
    - then starts processing immediately

When a task should be executed, the initiator pops the next process off the FIFO queue and sends it a task message. The process runs to completion. The FIFO queue should never be empty.

Caveat: periodically the entire FIFO queue should be reset so that the index numbers don't become bignums.

Concurrency bug?

Hi,

I just trying to write to a ring buffer with size 10.000 a 20.000 messages in one process and poll that buffer in a separate process. The unexpected behavior is that polled 30.000 messages while written 20.000 in total.

f(),
Cre = fun() -> Self = self(),
Poll = fun(Cont) ->
case ets_buffer:read_dedicated(srb, 100 ) of
[] ->
timer:sleep(50),
Cont(Cont);
Chunk when is_list(Chunk) ->
Self ! {chunk, Chunk},
Cont(Cont);
{missing_ets_data, _, _} ->
erlang:yield(),
Cont(Cont)
end end,
spawn_link(fun()-> ets_buffer:create_dedicated(srb, ring, 10000), Poll(Poll) end) end.

[ets_buffer:write_dedicated(srb, {msg,N}) || N <- lists:seq(1,20000)].

false = ( 20000 >= length(lists:flatten([ receive {chunk, L} -> L end|| N<-lists:seq(1,element(2,erlang:process_info(self(), message_queue_len)))]))).

Race condition in cxy_cache:create_new_value

There is a race condition when a cached value is requested by 2 different processes when it doesn't exist in the cache. Both processes check both generations, then attempt to generate a new value to cache. One stores into the cache first, and is currently (as of v0.9.8a) the winner with the 2nd generated value ignored. Since it is not possible to independently determine which has generated the later value, a new explicit interface needs to be added:

create_key_value(Key) -> {Version, Value}.
is_later_version(Vsn1, Vsn2) -> Vsn_Which_Is_Newer.

The 2nd callback is optional. It defaults to:

is_later_version(Vsn1, Vsn2) -> Vsn1 < Vsn2.

The cache should always preserve the later version when there is a race collision on inserts.

Add cxy_ctl_sup to properly own ets tables

Calling cxy_ctl:init or cxy_ctl:add_task_types causes a new ets_table to be created. The caller will be the owner of the ets table. If a client application calls these library functions from the wrong process, the ets table might get deleted accidentally when the caller's process terminates.

To make the library safer to use, a supervisor should be provided that starts a gen_server dedicated to owning the ets tables. The API for this gen_server should support creating and adjusting the limits, but leave it up to the client to directly call the cxy_ctl functions for executing tasks.

Calling cxy_cache_sup:start_cache/3 succeeds with bad param

The third argument should be a function, and the code indeed does is_function(Gen_Fun, 3), but the call does not crash.

Create a test case passing the wrong type argument for the third arg (the example was 1000) and ensure that the cxy_cache_fsm:start_link/3 fails when the arglist is wrong.

Partially full ring buffers probably crash

If a ring buffer is partially filled, and enough reads are executed to pass all the written entries, the next read is likely to crash the caller since the ets entry won't exist.

Make tests fails : Test run failed! Reason: {'EXIT',{error,{enoent,"src/"}}}

Erlang/OTP 17 [erts-6.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V6.1 (abort with ^G)
OSX 10.8.5

make tests
 GEN    clean
 GEN    clean-app
 GEN    clean-ct
./rebar get-deps
==> proper (get-deps)
./rebar compile
==> proper (compile)
make[2]: `include/compile_flags.hrl' is up to date.
 ERLC   cxy_cache.erl cxy_cache_fsm.erl cxy_cache_sup.erl cxy_ctl.erl cxy_synch.erl ets_buffer.erl
Old inliner: threshold=0 functions=[{return_cache_gen1,2},
                                    {return_cache_gen2,2},
                                    {return_cache_refresh,2},
                                    {return_cache_error,2},
                                    {return_cache_miss,2},
                                    {return_cache_delete,2},
                                    {return_and_count_cache,4}]
Old inliner: threshold=0 functions=[{buffer_type,1},
                                    {buffer_type_num,1},
                                    {meta_key,1},
                                    {buffer_key,2},
                                    {buffer_data,3},
                                    {ring_reserve_write_cmd,1},
                                    {ring_reserve_read_cmd,4},
                                    {ring_reserve_read_all_cmd,1},
                                    {fifo_publish_write_cmd,0},
                                    {fifo_reserve_read_cmd,2},
                                    {fifo_reserve_read_all_cmd,2},
                                    {lifo_reserve_write_cmd,0},
                                    {get_buffer_type,2},
                                    {get_buffer_readw,2},
                                    {get_buffer_write,2},
                                    {get_buffer_type_and_pos,3},
                                    {insert_ets_internal,4},
                                    {set_high_water,3},
                                    {set_high_water_cmd,1}]
 APP    epocxy.app.src
 GEN    build-ct-suites
Common Test v1.8.1 starting (cwd is /***dev/erlang/epocxy)
CWD set to: "/***/dev/erlang/epocxy/logs/[email protected]_23.06.51"
TEST INFO: 3 test(s), 39 case(s) in 3 suite(s)
Updating /***/dev/erlang/epocxy/logs/all_runs.html... done
Test run failed! Reason:
{'EXIT',{error,{enoent,"src/"}}}
make: *** [tests-ct] Error 2

Concurrent 1 writer and 1 reader causes ets_buffer:read return {missing_ets_data, buffer_name, N}

Hi,
I've simulated producer consumer schema with ets_buffer of type 'ring'.

  1. 1 writer process which in a loop (without explicit sleep) calls ets_buffer:write
  2. 1 reader process which in a loop (without explicit sleep and also with sleep 5 mls)
  3. size of a ring buffer is 200

Is it a situation (returning {missing_ets_data, buffer_name, -N}) which reader should tolerate in some way? If such, then which way is a most correct? Experimentally I've discovered that if I proceed with readings further then after some number of attempts (approx N times) I'll start to receive normal data.
Or it is still a some kind of a concurrency bug, and if such situation occur the ring buffer must be considered as 'broken' and should not be used any more by program? The documentation for 'missing_ets_data' is not clear for me.

Add API calls to spawn in cxy_ctl with initial process dictionary values

Sometimes 3rd party frameworks / libraries use the process dictionary (I'm looking at you, lager). The spawned process should continue running in the stateful context of the originating process and so a portion or all of the process dictionary needs to be copied to the newly spawned process prior to executing the new function.

The safest, fastest way to do this without creating an overly complicated API to handle variations is likely to provide one extra argument consisting of a process dictionary (or subset of one) in list format as would be returned by get(). The calling process needs to construct or fetch the Key/Value list before calling spawn, and then must use a new API spawn entry which interprets the first argument as a process dictionary to be injected to the new process.

No correct way to invalidate cache entries

cxy_cache:delete_item is currently used to invalidate a cache entry. This causes a race condition which can leave a stale value in the cache indefinitely.

  1. Process 2 deletes entry to invalidate
  2. Stale value fetched from DB in process 1 via create_new_key
  3. Process 2 updates DB
  4. Stale value from process 1 is inserted to cache
  5. Process 2 reads from cache and finds stale value (doesn't call create_new_key)

A safer approach is to create a new function refresh_item which:

  1. Calls create_key_value to get {Version, Value}
  2. Fetches cached {Version, Value}
  3. Performs conflict resolution to determine newer Version and inserts to new value to cache if necessary

Do controlled shutdown when cxy_ctl:remove_task_types is called

Currently remove_task_types deletes a necessary record from the cxy_ctl table and eliminates the ets ring buffer used to record execution timing. Either of these changes can cause a running process which attempts to execute a task to crash with badarg or worse.

Instead, this function should set the Max Active Procs to 0 and wait for all spawns and inline procs to stop before deleting anything from the ets table.

Spawn a function hook on generational cache elimination

When an oldest generation is dumped, the associated items may have related information that should be cleaned up. The delete of an ets table should be preceded by spawning a process on the list of elements in the soon to be discarded table. The newly spawned process can be more methodical about working its way through the elements (at the expense of keeping them all in memory longer).

cxy_fount behaviour requires init/1 but is called as init/*

The fount behaviour requires init to be declared taking one argument. But in cxy_fount when the init callback is called it is called with the "init args" as arguments. Rather than, what I suspect was intended; a list where the first element is the "Mod_Init_Args" variable (in version 1.1.0 cxy_fount.erl:422).

cxy_ctl:execute_wrapper assumes ets missing on badarg

catch error:badarg assumes ets table missing. It could just be a badarg on the apply args list or in the function body. The catch clause should check ets:info([named_table]) before reporting whether there is an ets error or not.

ring buffer bug

Hello!

Anyone has an idea why is this happening? :)

Interactive Elixir (1.12.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> :ets_buffer.create(:small, :ring, 6)
:ets_buffer
iex(2)> :ets_buffer.write(:small, "1")
1
iex(3)> :ets_buffer.write(:small, "2")
2
iex(4)> :ets_buffer.write(:small, "3")
3
iex(5)> :ets_buffer.write(:small, "4")
4
iex(6)> :ets_buffer.read(:small, 1)
["1"]
iex(7)> :ets_buffer.read(:small, 1)
["2"]
iex(8)> :ets_buffer.read(:small, 1)
["3"]
iex(9)> :ets_buffer.read(:small, 1)
["4"]
iex(10)> :ets_buffer.read(:small, 1)
[]
iex(11)> :ets_buffer.write(:small, "5")
1
iex(12)> :ets_buffer.write(:small, "6")
2
iex(13)> :ets_buffer.write(:small, "7")
3
iex(14)> :ets_buffer.read(:small, 1)
["5", "6"]
iex(15)> :ets_buffer.read(:small, 1)
{:missing_ets_data, :small, -1}
iex(16)> :ets_buffer.read(:small, 1)
["7"]
iex(17)> :ets_buffer.read(:small, 1)
[]
iex(18)>

on iex line 14 ring buffer suddenly returns 2 elements, and on the line 15 :missing_ets_data error

Add cxy_ctl:adjust_task_limits

This function allows the ceiling on number of concurrent tasks to be changed. If it is lowered below the current number of executing processes, no new spawns will occur until the running processes gradually die off and drop below the new threshold (which may conceivable never happen if you use very long-lived processes).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.