duomark / epocxy Goto Github PK
View Code? Open in Web Editor NEWErlang Patterns of Concurrency
License: Other
Erlang Patterns of Concurrency
License: Other
cache can grow endlessly and it would be a good idea to have an optional policy to flush/delete items from the cache based on an LRU.
Currently to get the oldest entry you must list the whole table using history_timestamped. When the buffer is large, this will crash the VM running out of memory. The following functions should be added:
Hi,
I found that the name was rewritten when starting a Fount by this function. What do you think about removing it? When it's here it requires more work to retrieve the sup using its name. Thoughts?
Ring buffers are great for concurrent writers because they have a natural cap on memory used and overwrite the oldest entries automatically.
Ring buffers for readers can also be great for tasks like load-balancing in a round-robin fashion by just continuously reading the next element and allowing wraparound on reads.
The current implementation deletes entries from LIFO / FIFO buffers when they are read, but not from RING buffers, but it only allows Write wraparound on RING buffers. A Read wraparound RING type needs to be introduced.
A common problem when working with buffers is how long entries remain in the buffer. Consider adding timestamps to buffer entries and statistics about how long entries persist.
gen_stream used buffer striping that was driven by access. New concurrent process didn't execute until there was enough demand. This can be done programmatically by the user with maybe_execute_task.
Using a 'deferred' option so that up to a Max of spawns occurs, after which the spawning mechanism pauses until below Max before allowing the next to start. This involves maintaining Max processes running and a queue of deferred processes (the queue is the synchronization barrier).
Another option is to spawn as often as is needed, but to not start computation when over the Max. Processes could only proceed when given a signal to do so. This in itself is a form of synchronization.
The 2nd option allows for a more general implementation of synchronization barriers at the expense of more processes and more memory, but they are quicker to start running having already amortized the spawn cost.
ets_buffer can be used to collect data from many writers, and periodical read and process the data. When the consumer falls behind the ets_buffer will grow in size. It would be nice to know the largest size since the high-water mark was last reset.
Same goes for the number of concurrent processes when using the concurrency limiter.
Hi,
I just trying to write to a ring buffer with size 10.000 a 20.000 messages in one process and poll that buffer in a separate process. The unexpected behavior is that polled 30.000 messages while written 20.000 in total.
f(),
Cre = fun() -> Self = self(),
Poll = fun(Cont) ->
case ets_buffer:read_dedicated(srb, 100 ) of
[] ->
timer:sleep(50),
Cont(Cont);
Chunk when is_list(Chunk) ->
Self ! {chunk, Chunk},
Cont(Cont);
{missing_ets_data, _, _} ->
erlang:yield(),
Cont(Cont)
end end,
spawn_link(fun()-> ets_buffer:create_dedicated(srb, ring, 10000), Poll(Poll) end) end.
[ets_buffer:write_dedicated(srb, {msg,N}) || N <- lists:seq(1,20000)].
false = ( 20000 >= length(lists:flatten([ receive {chunk, L} -> L end|| N<-lists:seq(1,element(2,erlang:process_info(self(), message_queue_len)))]))).
is it possible (cxy tools) to give a "more plain" structure for errors? because errors are thrown like: {error, {mfa_failure, …} and if the caller actually needs the error message, it would be useful to just use a simple pattern matching to get it
make edoc
with the following error:
[epocxy] make edoc 16:46:59 ☁ master ☀
GEN distclean-edoc
GEN edoc
./src/cxy_cache.erl, function reserve/2: at line 113: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 110: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 109: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 108: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 106: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 105: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 103: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 100: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 99: multiple @spec tag.
./src/cxy_cache.erl, function reserve/2: at line 98: multiple @spec tag.
edoc: skipping source file './src/cxy_cache.erl': {'EXIT',error}.
./src/cxy_cache_fsm.erl, function start_link/2: at line 51: multiple @spec tag.
./src/cxy_cache_fsm.erl, function start_link/2: at line 49: multiple @spec tag.
./src/cxy_cache_fsm.erl, function start_link/2: at line 48: multiple @spec tag.
edoc: skipping source file './src/cxy_cache_fsm.erl': {'EXIT',error}.
./src/cxy_cache_sup.erl, function start_link/0: at line 39: multiple @spec tag.
./src/cxy_cache_sup.erl, function start_link/0: at line 38: multiple @spec tag.
./src/cxy_cache_sup.erl, function start_link/0: at line 37: multiple @spec tag.
edoc: skipping source file './src/cxy_cache_sup.erl': {'EXIT',error}.
./src/cxy_ctl.erl, function update_spawn_times/5: at line 168: multiple @spec tag.
edoc: skipping source file './src/cxy_ctl.erl': {'EXIT',error}.
./src/cxy_fount.erl, function start_link/4: at line 178: multiple @spec tag.
edoc: skipping source file './src/cxy_fount.erl': {'EXIT',error}.
./src/cxy_fount_sup.erl, function start_link/2: at line 33: multiple @spec tag.
./src/cxy_fount_sup.erl, function start_link/2: at line 32: multiple @spec tag.
edoc: skipping source file './src/cxy_fount_sup.erl': {'EXIT',error}.
./src/cxy_regulator.erl, function start_link/0: at line 65: multiple @spec tag.
edoc: skipping source file './src/cxy_regulator.erl': {'EXIT',error}.
./src/cxy_synch.erl, function before_task/2: at line 41: multiple @spec tag.
edoc: skipping source file './src/cxy_synch.erl': {'EXIT',error}.
./src/ets_buffer.erl, function buffer_type/1: at line 123: multiple @spec tag.
edoc: skipping source file './src/ets_buffer.erl': {'EXIT',error}.
edoc: error in doclet 'edoc_doclet': {'EXIT',error}.
{"init terminating in do_boot",error}
Crash dump is being written to: erl_crash.dump...done
init terminating in do_boot (error)
make: *** [edoc] Error 1
There are three currently requested features related to "generations":
These may or may not make sense for generalization, but the model in general is:
If you attempt to create a new ets_buffer which already exists, the ets:new is silently skipped and then the metadata is reinitialized, wreaking havoc on any existing buffered data.
Create should refuse to execute if the ets table already exists.
A "process geyser" consists of pre-spawned processes that are intended to be used once. Rather than a worker pool where processes are checked out, returned and reused, the geyser is a fount of new processes always at the ready (hopefully). Stale processes in a worker pool can become fragmented, or have leftover bits in their process dictionaries.
The pattern works as follows:
When a task should be executed, the initiator pops the next process off the FIFO queue and sends it a task message. The process runs to completion. The FIFO queue should never be empty.
Caveat: periodically the entire FIFO queue should be reset so that the index numbers don't become bignums.
Hi,
f(),
Cre = fun() -> Self = self(),
Poll = fun(Cont) ->
case ets_buffer:read_dedicated(srb, 100 ) of
[] ->
timer:sleep(50),
Cont(Cont);
Chunk when is_list(Chunk) ->
Self ! {chunk, Chunk},
Cont(Cont);
{missing_ets_data, _, _} ->
erlang:yield(),
Cont(Cont)
end end,
spawn_link(fun()-> ets_buffer:create_dedicated(srb, ring, 10000), Poll(Poll) end) end.
[ets_buffer:write_dedicated(srb, {msg,N}) || N <- lists:seq(1,20000)].
false = ( 20000 >= length(lists:flatten([ receive {chunk, L} -> L end|| N<-lists:seq(1,element(2,erlang:process_info(self(), message_queue_len)))]))).
There is a race condition when a cached value is requested by 2 different processes when it doesn't exist in the cache. Both processes check both generations, then attempt to generate a new value to cache. One stores into the cache first, and is currently (as of v0.9.8a) the winner with the 2nd generated value ignored. Since it is not possible to independently determine which has generated the later value, a new explicit interface needs to be added:
create_key_value(Key) -> {Version, Value}.
is_later_version(Vsn1, Vsn2) -> Vsn_Which_Is_Newer.
The 2nd callback is optional. It defaults to:
is_later_version(Vsn1, Vsn2) -> Vsn1 < Vsn2.
The cache should always preserve the later version when there is a race collision on inserts.
Calling cxy_ctl:init or cxy_ctl:add_task_types causes a new ets_table to be created. The caller will be the owner of the ets table. If a client application calls these library functions from the wrong process, the ets table might get deleted accidentally when the caller's process terminates.
To make the library safer to use, a supervisor should be provided that starts a gen_server dedicated to owning the ets tables. The API for this gen_server should support creating and adjusting the limits, but leave it up to the client to directly call the cxy_ctl functions for executing tasks.
The third argument should be a function, and the code indeed does is_function(Gen_Fun, 3), but the call does not crash.
Create a test case passing the wrong type argument for the third arg (the example was 1000) and ensure that the cxy_cache_fsm:start_link/3 fails when the arglist is wrong.
Writing to an ets_buffer should return the number of elements now present in the buffer so that if this is the first write, an application can react or if an application has a threshold it can respond with appropriate logic.
If a ring buffer is partially filled, and enough reads are executed to pass all the written entries, the next read is likely to crash the caller since the ets entry won't exist.
Erlang/OTP 17 [erts-6.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V6.1 (abort with ^G)
OSX 10.8.5
make tests
GEN clean
GEN clean-app
GEN clean-ct
./rebar get-deps
==> proper (get-deps)
./rebar compile
==> proper (compile)
make[2]: `include/compile_flags.hrl' is up to date.
ERLC cxy_cache.erl cxy_cache_fsm.erl cxy_cache_sup.erl cxy_ctl.erl cxy_synch.erl ets_buffer.erl
Old inliner: threshold=0 functions=[{return_cache_gen1,2},
{return_cache_gen2,2},
{return_cache_refresh,2},
{return_cache_error,2},
{return_cache_miss,2},
{return_cache_delete,2},
{return_and_count_cache,4}]
Old inliner: threshold=0 functions=[{buffer_type,1},
{buffer_type_num,1},
{meta_key,1},
{buffer_key,2},
{buffer_data,3},
{ring_reserve_write_cmd,1},
{ring_reserve_read_cmd,4},
{ring_reserve_read_all_cmd,1},
{fifo_publish_write_cmd,0},
{fifo_reserve_read_cmd,2},
{fifo_reserve_read_all_cmd,2},
{lifo_reserve_write_cmd,0},
{get_buffer_type,2},
{get_buffer_readw,2},
{get_buffer_write,2},
{get_buffer_type_and_pos,3},
{insert_ets_internal,4},
{set_high_water,3},
{set_high_water_cmd,1}]
APP epocxy.app.src
GEN build-ct-suites
Common Test v1.8.1 starting (cwd is /***dev/erlang/epocxy)
CWD set to: "/***/dev/erlang/epocxy/logs/[email protected]_23.06.51"
TEST INFO: 3 test(s), 39 case(s) in 3 suite(s)
Updating /***/dev/erlang/epocxy/logs/all_runs.html... done
Test run failed! Reason:
{'EXIT',{error,{enoent,"src/"}}}
make: *** [tests-ct] Error 2
Hi,
I've simulated producer consumer schema with ets_buffer of type 'ring'.
Is it a situation (returning {missing_ets_data, buffer_name, -N}) which reader should tolerate in some way? If such, then which way is a most correct? Experimentally I've discovered that if I proceed with readings further then after some number of attempts (approx N times) I'll start to receive normal data.
Or it is still a some kind of a concurrency bug, and if such situation occur the ring buffer must be considered as 'broken' and should not be used any more by program? The documentation for 'missing_ets_data' is not clear for me.
Sometimes 3rd party frameworks / libraries use the process dictionary (I'm looking at you, lager). The spawned process should continue running in the stateful context of the originating process and so a portion or all of the process dictionary needs to be copied to the newly spawned process prior to executing the new function.
The safest, fastest way to do this without creating an overly complicated API to handle variations is likely to provide one extra argument consisting of a process dictionary (or subset of one) in list format as would be returned by get(). The calling process needs to construct or fetch the Key/Value list before calling spawn, and then must use a new API spawn entry which interprets the first argument as a process dictionary to be injected to the new process.
cxy_cache:delete_item is currently used to invalidate a cache entry. This causes a race condition which can leave a stale value in the cache indefinitely.
A safer approach is to create a new function refresh_item which:
Currently remove_task_types deletes a necessary record from the cxy_ctl table and eliminates the ets ring buffer used to record execution timing. Either of these changes can cause a running process which attempts to execute a task to crash with badarg or worse.
Instead, this function should set the Max Active Procs to 0 and wait for all spawns and inline procs to stop before deleting anything from the ets table.
When an oldest generation is dumped, the associated items may have related information that should be cleaned up. The delete of an ets table should be preceded by spawning a process on the list of elements in the soon to be discarded table. The newly spawned process can be more methodical about working its way through the elements (at the expense of keeping them all in memory longer).
Steps to reproduce (version 1.1.0):
The fount behaviour requires init to be declared taking one argument. But in cxy_fount when the init callback is called it is called with the "init args" as arguments. Rather than, what I suspect was intended; a list where the first element is the "Mod_Init_Args" variable (in version 1.1.0 cxy_fount.erl:422).
catch error:badarg assumes ets table missing. It could just be a badarg on the apply args list or in the function body. The catch clause should check ets:info([named_table]) before reporting whether there is an ets error or not.
Hello!
Anyone has an idea why is this happening? :)
Interactive Elixir (1.12.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> :ets_buffer.create(:small, :ring, 6)
:ets_buffer
iex(2)> :ets_buffer.write(:small, "1")
1
iex(3)> :ets_buffer.write(:small, "2")
2
iex(4)> :ets_buffer.write(:small, "3")
3
iex(5)> :ets_buffer.write(:small, "4")
4
iex(6)> :ets_buffer.read(:small, 1)
["1"]
iex(7)> :ets_buffer.read(:small, 1)
["2"]
iex(8)> :ets_buffer.read(:small, 1)
["3"]
iex(9)> :ets_buffer.read(:small, 1)
["4"]
iex(10)> :ets_buffer.read(:small, 1)
[]
iex(11)> :ets_buffer.write(:small, "5")
1
iex(12)> :ets_buffer.write(:small, "6")
2
iex(13)> :ets_buffer.write(:small, "7")
3
iex(14)> :ets_buffer.read(:small, 1)
["5", "6"]
iex(15)> :ets_buffer.read(:small, 1)
{:missing_ets_data, :small, -1}
iex(16)> :ets_buffer.read(:small, 1)
["7"]
iex(17)> :ets_buffer.read(:small, 1)
[]
iex(18)>
on iex line 14 ring buffer suddenly returns 2 elements, and on the line 15 :missing_ets_data error
History requests (and maybe others) use ets:match which may not be safe in the face of concurrency. Look into using safe fixtable to prevent concurrency issues when reporting aggregate data.
This function allows the ceiling on number of concurrent tasks to be changed. If it is lowered below the current number of executing processes, no new spawns will occur until the running processes gradually die off and drop below the new threshold (which may conceivable never happen if you use very long-lived processes).
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.