luizalabs / shared-memory-dict Goto Github PK
View Code? Open in Web Editor NEWA very simple shared memory dict implementation
License: MIT License
A very simple shared memory dict implementation
License: MIT License
There is an PR suggesting the addition of *args and **kwargs in our object initialization, but I would like to discuss some points and maybe a possible postponement of this implementation to release a major version, since it can change the method signature.
Some ideas and considerations:
__init__
and use it from serializer module, like logging
module.iterator
param. Why not do the same, but optionally allow to define a size?__init__
also helps here, considering it will be one less name.I would like to listen to your opinions @spaceone, @renanivo, @jeanmask, @itheodoro, @thyagomininelli.
I missed some details in the README file regarding common use cases of this project. It would be nice if luizalabs explains how she is using it in prod right now in order to get more eyes looking at it.
I was able to grow the underlying memory mapped file. In the dict.py
module around line 184ish, I did the following:
def _save_memory(self, db: Dict[str, Any]) -> None:
"""Allocate more memory 1.5x current size
if there is a ValueError
"""
data = self._serializer.dumps(db)
# get size of current memory block
sz = len(self._memory_block.buf)
while True:
try:
# keep trying to add additional data until success
self._memory_block.buf[: len(data)] = data
break
except ValueError as exc:
# raise ValueError("exceeds available storage") from exc
# grow file size by 1.5 times
new_size = int(1.5*sz)
os.ftruncate(self.shm._fd, new_size)
stats = os.fstat(self.shm._fd)
self.shm._size = stats.st_size
self.shm._mmap = mmap.mmap(self.shm._fd, new_size)
self.shm._buf = memoryview(self.shm._mmap)
except Exception as exc:
# Handle unexpected error
raise exc
I wrote a simple test script:
from shared_memory_dict import SharedMemoryDict
# try to create a shared memory file of size 1024 bytes
# will reuse existing file if available
smd = SharedMemoryDict(name='tokens', size=1024)
smd['some-key'] = 'some-value-with-any-type'
print("File size before: ", smd.shm.size)
for i in range(400):
smd[i] = i
print("File size after: ", smd.shm.size)
# clean up
smd.shm.close()
smd.shm.unlink()
which ouputs:
File size before: 1024
File size after: 2304
Additionally I fired up a separate python process in a 2nd terminal window and was able to see all changes. This seems promising, but I haven't tested everything; this may not even be the best approach. Sometimes, the two processes appear "out of sync". This usually happens if you abruptly close a process before doing proper cleanup (.close()
and .unlink()
).
test/root# ./t2.py
test/root# /usr/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 1 leaked shared_memory objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
t2.py code
#!/usr/bin/env python3
from shared_memory_dict import SharedMemoryDict
import datetime, os, time
smd = SharedMemoryDict(name='tokens', size=1024*10)
ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
smd['0005'] = ts
smd.shm.close()
I was testing the behavior of using two processes to write the dict, even with SHARED_MEMORY_USE_LOCK=1, I got exceptions
Python 3.9.16. Mac OS
run the two python file in separate terminal with SHARED_MEMORY_USE_LOCK=1 set.
export SHARED_MEMORY_USE_LOCK=1 && python shm_process_safe.py
shm_process_saft.py:
import time
from shared_memory_dict import SharedMemoryDict
smd = SharedMemoryDict(name='tokens', size=1024)
smd['a'] = 'xx'
time.sleep(2)
for i in range(1000000):
smd['b'] = str(i) + 'yy'
print(smd)
# try:
# smd.shm.close()
# smd.shm.unlink()
# del smd
# except Exception as e:
# pass
in another terminal, run the other python file also with SHARED_MEMORY_USE_LOCK=1 set.
export SHARED_MEMORY_USE_LOCK=1 && python shm_process_safe2.py
shm_process_safe2.py:
import time
from shared_memory_dict import SharedMemoryDict
smd = SharedMemoryDict(name='tokens', size=1024)
print(smd)
time.sleep(1)
# smd['b']= 'yy'
print(smd)
for i in range(1000000):
smd['b'] = str(i) + 'xx'
print(smd)
# try:
# smd.shm.close()
# smd.shm.unlink()
# del smd
# except Exception as e:
# pass
Result:
Got following errors:
export SHARED_MEMORY_USE_LOCK=1 && python shm_process_safe.py
Traceback (most recent call last):
File "/Users/xx/work/code_snipet/shm/shm_process_safe.py", line 7, in <module>
smd['b'] = str(i) + 'yy'
File "/Users/xx/.virtualenvs/octopus/lib/python3.9/site-packages/shared_memory_dict/dict.py", line 95, in __setitem__
with self._modify_db() as db:
File "/usr/local/Cellar/[email protected]/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 119, in __enter__
return next(self.gen)
File "/Users/xx/.virtualenvs/octopus/lib/python3.9/site-packages/shared_memory_dict/dict.py", line 87, in _modify_db
db = self._read_memory()
File "/Users/xx/.virtualenvs/octopus/lib/python3.9/site-packages/shared_memory_dict/dict.py", line 184, in _read_memory
return self._serializer.loads(self._memory_block.buf.tobytes())
File "/Users/xx/.virtualenvs/octopus/lib/python3.9/site-packages/shared_memory_dict/serializers.py", line 50, in loads
return pickle.loads(data)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x94 in position 6: invalid start byte
/usr/local/Cellar/[email protected]/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 1 leaked shared_memory objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
Proposal
As from version 3.7 of Python, dictionary order is guaranteed in any language implementation, I propose the use of dict
instead of OrderedDict
. One of the requirements of the project is that the version of Python must be greater than or equal to 3.8. As a benefit of this proposal, we have the use of a structure that does not need to be imported in any module and that is also optimized, which can lead to performance gains and space allocation.
Problems
Even though it inherits from dict
, OrderedDict
has some unique methods or with different signatures. For example:
move_to_end
- is only present in OrderedDict
, and for cases where the last parameter is False, its implementation using dict
is complex.
popitem
- In the dict
it removes the last element inserted, however in the `OrderedDict it has a parameter last that when False, it removes the first element inserted in the dictionary.
Internally, OrderedDict has a linked list implementation that favors this insertion and removal at the beginning of the map.
Solution and trade-offs
Inherit from abc MutableMapping, implementing some missing methods, using dict by composition. One implication of this solution is incompatibility with methods already present and mentioned above. To avoid this, I suggest the depreciation of methods that do not respect the dict interface, assuming they are not the main ones.
After reading this comment by Alex Martelli I am even more convinced of this solution.
SharedMemoryDict cannot be serialized by json.dumps.
I have to use ._read_memory() to do so.
It seems strange.
Maybe we should add a .dict() method to convert it to ordinary dict?
from shared_memory_dict import SharedMemoryDict
smd = SharedMemoryDict(name='tokens', size=1024)
smd['some-key'] = {}
smd['some-key']['x'] = 1 #this statement is ignored
print(smd) #output {'some-key': {}}
Tried on Debian, Python 3.9, main branch,
Python 3.9.2 (default, Feb 28 2021, 17:03:44)
[GCC 10.2.1 20210110] on linux
>>> import shared_memory_dict, timeit
>>> orig = {}
>>> for i in range(100): orig[i] = i
...
>>> len(orig)
100
>>> timeit.timeit('orig[10]', globals=globals())
0.03487025899812579
>>> for i in range(1000): orig[i] = i
...
>>> timeit.timeit('orig[10]', globals=globals())
0.0321930960053578
>>> shared = shared_memory_dict.SharedMemoryDict('some_name', 10000)
>>> for i in range(100): shared[i] = i
...
>>> len(shared)
100
>>> timeit.timeit('shared[10]', globals=globals())
3.8467855940107256
>>> for i in range(1000): shared[i] = i
...
>>> len(shared)
1000
>>> timeit.timeit('shared[10]', globals=globals())
29.410958576016128
Is that the intended or expected behavior?
It looks to me like it is unserializing the whole dict for every single get of a value, even when nothing has ever changed.
I wrote a demo, and when I was able to run it, I found that I could not successfully calculate 1kw additions
I tried to use the decorator method of the lock in the lock.py file to create and use the lock in the child process instead of creating it in the main process and passing it to the child process.
#! /usr/bin/env python3
# _*_ coding : UTF-8 _*_
# ------------------------------ #
# 尝试使用from multiprocessing import Lock作为锁
# ------------------------------ #
from multiprocessing import Process, shared_memory
import time
# 初始化锁,用法是函数装饰器
import os
from functools import wraps
if os.getenv('SHARED_MEMORY_USE_LOCK') == '1':
from multiprocessing import Lock
else:
class Lock: # type: ignore
def acquire(self):
pass
def release(self):
pass
_lock = Lock()
def lock(func):
@wraps(func)
def wrapper(*args, **kwargs):
_lock.acquire()
try:
return func(*args, **kwargs)
finally:
_lock.release()
return wrapper
#——————————————————————#
@lock
def add(x):
x = x + 1
return x
def fn1(shmem_name: str, n: int) -> None:
"""
shmem_name: 共享内存块的唯一标识
width: 内存字节大小
n: 操作次数
"""
# 获取内存坐标
shmem_list = shared_memory.ShareableList(name=shmem_name)
for _ in range(n):
# 加了500次
add(shmem_list[0])
# 关闭子进程创建的内存块
shmem_list.shm.close()
def fn2(shmem_name: str, n: int) -> None:
"""
shmem_name: 共享内存块的唯一标识
width: 内存字节大小
n: 操作次数
"""
# 获取内存坐标
shmem_list = shared_memory.ShareableList(name=shmem_name)
for _ in range(n):
# 加了500次
add(shmem_list[0])
# 关闭子进程创建的内存块
shmem_list.shm.close()
if __name__ == "__main__":
# 测试下客户端+1 服务端+1 计算效率
# 开始时间
start_time = time.time()
# 创建共享内存区域,由于我对大数计算没有信心能达到python的程度,因此使用list储存
shmem_list = shared_memory.ShareableList([0])
shmem_list[0] = 0
# 操作次数,python可以在数字中使用_增加可读性
total = 10_000_000
# 多进程运行,各加500次
p1 = Process(target=fn1, args=(shmem_list.shm.name, total // 2))
p2 = Process(target=fn2, args=(shmem_list.shm.name, total // 2))
p1.start()
p2.start()
p1.join()
p2.join()
# 比较是否一致
print(f"a[{shmem_list[0]}] == total[{total}]")
# 关闭主进程创建的内存块
shmem_list.shm.close()
# 释放内存块
shmem_list.shm.unlink()
# 结束时间
end_time = time.time()
# 运行时间
print(end_time-start_time)
Works fine when using posix_ipc as lock:
#! /usr/bin/env python3
# _*_ coding : UTF-8 _*_
# ------------------------------ #
# 尝试使用posix作为进程间的锁控制中断
# 使用vscode非debug模式运行时长:
# 只有一个单进程运算500万次:17.25112557411194
# 两个进程运行1000万次:
# 53.944918632507324
# 使用htop可以观察到cpu的kernel(红色区域)占用很高,存在内核切换,而且cpu利用不充分
# ------------------------------ #
import time
import posix_ipc
from multiprocessing import Process, shared_memory
# 设置锁的全局命名信号量
global_variable = '/my_semaphore_uuid'
# global_variable (str): 系统全局命名信号量,命名信号量的名称必须以正斜杠(/)开头,这样才会视为系统范围内的全局资源。
semaphore = posix_ipc.Semaphore(global_variable, flags=posix_ipc.O_CREAT, initial_value=1)
# 初始化的时候需要确保semaphore.value可用值为1
def fn1(shmem_name: str, n: int) -> None:
"""
shmem_name: 共享内存块的唯一标识
width: 内存字节大小
n: 操作次数
"""
# 获取内存坐标
shmem_list = shared_memory.ShareableList(name=shmem_name)
for _ in range (n):
# 获取锁
semaphore.acquire()
# 以原子格式+1,不加锁会报错。
shmem_list[0] +=1
# 释放锁
semaphore.release()
# 关闭子进程创建的内存块
shmem_list.shm.close()
def fn2(shmem_name: str, n: int) -> None:
"""
shmem_name: 共享内存块的唯一标识
width: 内存字节大小
n: 操作次数
"""
# 获取内存坐标
shmem_list = shared_memory.ShareableList(name=shmem_name)
for _ in range (n):
# 获取锁
semaphore.acquire()
# 以原子格式+1,不加锁会报错。
shmem_list[0] +=1
# 释放锁
semaphore.release()
# 关闭子进程创建的内存块
shmem_list.shm.close()
if __name__ == "__main__":
# 测试下客户端+1 服务端+1 计算效率
# 开始时间
start_time = time.time()
# 创建共享内存区域,由于我对大数计算没有信心能达到python的程度,因此使用list储存
shmem_list = shared_memory.ShareableList([0])
# 操作次数,python可以在数字中使用_增加可读性
total = 10_000_000
# 多进程运行,各加500次
p1 = Process(target=fn1, args=(shmem_list.shm.name, total // 2))
p2 = Process(target=fn2, args=(shmem_list.shm.name, total // 2))
p1.start()
p2.start()
p1.join()
p2.join()
# 比较是否一致
print(f"a[{shmem_list[0]}] == total[{total}]")
# 关闭主进程创建的内存块
shmem_list.shm.close()
# 释放内存块
shmem_list.shm.unlink()
# 结束时间
end_time = time.time()
# 运行时间
print(end_time-start_time)
Of course, the use of cross-python interpreters/cross-languages is a minority requirement, just as a remark.
Update dict.py to check if dictionary key exists
Ensure dictionary key exists to support common .get() method with default when not existing. Current code throws keyerror even when using a smd.get('non-exist-key', 'default_value')
/usr/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 1 leaked shared_memory objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
If you don't mind, I fixed it locally, I will contribute with my fix
Steps:
How can i set environment variable SHARED_MEMORY_USE_LOCK=1,please。
Hello,
it would be nice if the README would describe the reason for this project.
As there is already multiprocessing.Manager().dict()
in the stdlib which does the same?!
Is this significantly easier or faster than the multiprocessing library?
I want to creates shared dict with several milions items. With normal python dict addition elements in loop works well (~300k elements per second) with shared-memory-dict is ~ 1000 slower. Is there any way to quickly build big shared-dict?
.items(), keys(), and values() are present but return an odd result.
In [11]: smd.values()
Out[11]: odict_values([])
How should I properly close the shared memory?
My processes end with:
/usr/lib/python3/dist-packages/multiprocessing_resource_tracker.py:226: UserWarning: resource_tracker: There appear to be 3 leaked shared_memory objects to clean up at shutdown
(len(rtype_cache), rtype))
smd['some-key'] = 'some-value-with-any-type'
This value, if it is a dictionary. Then it won't work.
We run the following code:
smd = SharedMemoryDict(name='my_smd', size=20480)
smd['key'] = {}
smd['key']['key2'] = 'some-value'
We hope to get the following results:
{'key': {'key2': 'some-value'}}
But we can only get:
{'key': {}}
"Key2" and its value will not appear in the result.
Hi,
It seems when I do the unlink (smd.shm.unlink()) the shared memory is not destroyed.
First shell
>>> from shared_memory_dict import SharedMemoryDict
>>> smd = SharedMemoryDict(name='tokens', size=1024)
>>> smd['key1'] = 'value1'
>>> smd['key2'] = 'value2'
>>> smd
{'key1': 'value1', 'key2': 'value2'}
>>> smd.shm.unlink()
>>> smd
{'key1': 'value1', 'key2': 'value2'}
>>>
Second shell
>>> from shared_memory_dict import SharedMemoryDict
>>> smd = SharedMemoryDict(name='tokens', size=1024)
>>> smd
{'key1': 'value1', 'key2': 'value2'}
>>>
How to solve this?
The manual says:
unlink()
Requests that the underlying shared memory block be destroyed. In order to ensure proper cleanup of resources, unlink() should be called once (and only once) across all processes which have need for the shared memory block. After requesting its destruction, a shared memory block may or may not be immediately destroyed and this behavior may differ across platforms.
may or may not be immediately destroyed? It doesn't get destroyed at all? (platform is Windows)
The README should mention that the contents of the shared memory dict can be hijacked by any user which knows the name of the shared memory.
Getting the name is as simple as ls /dev/shm
.
To hijack this, one must simply create a SharedMemoryDict instance before the to be hijacked process starts and set the /dev/shm file world-read and writable.
the module needs python >= 3.8 ;-(
py3.7 is the base on raspeberrypi debian, so this module can't be piped on it.
it seems that there is no python3.8 dependancies needed.
SHould it be possible to support py3.7 too ?
Is it possible that the size dynamically grows?
The docs should mention that the size are simply bytes.
The content of the shared memory file is a pickle file with pickle.HIGHEST_PROTOCOL
.
So for each thing at least 5 bytes are necessary.
The interger 9223372036854775807 is 22 bytes.
this message is only a test!
Steps:
A bug occurs when initializing memory using hook and then associating it to the same shared memory through SharedMemoryDict.
How to reproduce:
from shared_memory_dict.hooks import create_shared_memory
from shared_memory_dict import SharedMemoryDict
create_shared_memory(name="test", size=64)
existing_smd = SharedMemoryDict(name='test', size=64)
print(existing_smd)
Traceback (most recent call last):
File "/home/cassiobotaro/projects/shared-memory-dict/shared_memory_dict/serializers.py", line 50, in loads
return pickle.loads(data)
_pickle.UnpicklingError: invalid load key, '\x00'.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "", line 1, in
File "/home/cassiobotaro/projects/shared-memory-dict/shared_memory_dict/dict.py", line 137, in str
return str(self._read_memory())
File "/home/cassiobotaro/projects/shared-memory-dict/shared_memory_dict/dict.py", line 184, in _read_memory
return self._serializer.loads(self._memory_block.buf.tobytes())
File "/home/cassiobotaro/projects/shared-memory-dict/shared_memory_dict/serializers.py", line 52, in loads
raise DeserializationError(data)
shared_memory_dict.serializers.DeserializationError: Failed to deserialize data: b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Configure black to avoid string normalization (default is double quotes) since we are adopting single quotes in the project.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.