bsc-s2 / pykit Goto Github PK
View Code? Open in Web Editor NEWa collection of toolkit lib for distributed system development in python
License: MIT License
a collection of toolkit lib for distributed system development in python
License: MIT License
pykit.http.Client.request(**argkv, body=None)
参考: coding上s2 项目里的 src/mime.py
参考: pykit项目中的aws模块, 关于加载外部资源的写法.
桐伟开发, 义谱帮忙指导下.
Port it from s2 and import pykit.modutil in s2.
#!/usr/bin/env python2
# coding: utf-8
import os
import pkgutil
def submodules(root_mod):
mod_path = root_mod.__file__
fn = os.path.basename( mod_path )
pathname = os.path.dirname( mod_path )
if fn not in ("__init__.py", "__init__.pyc"):
return None
rst = {}
for imp, name, _ in pkgutil.iter_modules( [ pathname ] ):
loader = imp.find_module( name )
mod = loader.load_module( root_mod.__name__ + "." + name )
rst[ name ] = mod
return rst
def submodule_tree(root):
r = submodules( root )
if r is None:
return None
for name, mod in r.items():
children = submodule_tree( mod )
r[ name ] = { "mod": mod, "children": children }
return r
# Return a dictionary with non-leaf node representing module path and leaf node
# referencing to module.
# Thus non-leaf module would not be returned.
# {
# "s2role": {
# "Front": {
# "python": { "nodeAPI": <module> },
# }
# }
#}
def submodule_leaf_tree(root):
r = submodules( root )
if r is None:
return None
for name, mod in r.items():
children = submodule_leaf_tree( mod )
if children is None:
r[ name ] = mod
else:
r[ name ] = children
return r
目前默认行为是没有找到记录的话从开头读取.
通过default_seek
参数指定:
None
或 start
: 从头开始读.end
从结尾开始读.需要增加这个参数的函数是 cat
和 iterate
.
实现 shutil.rmtree
def rmtree(path, ignore_errors=False, onerror=None):
"""Recursively delete a directory tree.
If ignore_errors is set, errors are ignored; otherwise, if onerror
is set, it is called to handle the error with arguments (func,
path, exc_info) where func is os.listdir, os.remove, or os.rmdir;
path is the argument to that function that caused it to fail; and
exc_info is a tuple returned by sys.exc_info(). If ignore_errors
is false and onerror is None, an exception is raised.
"""
if ignore_errors:
def onerror(*args):
pass
elif onerror is None:
def onerror(*args):
raise
try:
if os.path.islink(path):
# symlinks to directories are forbidden, see bug #1669
raise OSError("Cannot call rmtree on a symbolic link")
except OSError:
onerror(os.path.islink, path, sys.exc_info())
# can't continue even if onerror hook returns
return
names = []
try:
names = os.listdir(path)
except os.error, err:
onerror(os.listdir, path, sys.exc_info())
for name in names:
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except os.error:
mode = 0
if stat.S_ISDIR(mode):
rmtree(fullname, ignore_errors, onerror)
else:
try:
os.remove(fullname)
except os.error, err:
onerror(os.remove, fullname, sys.exc_info())
try:
os.rmdir(path)
except os.error:
onerror(os.rmdir, path, sys.exc_info())
兼容:可以删除 file、link
描述还没完成 😢
convert(dict_or_list_of_dict, key_path, value_path)
convert({ 1: "x", 2: "y"}, [1,2], [2])
# 使用foo[1]的值作为第一级的key, 使用foo[2]作为第2级的key;
# 输出:
{ "x": { "y": "y" } }
convert({ 1: "x", 2: { 3: "z", 4: "w" }}, [(2,3), 1], [2])
# 使用 foo[2][3] 的值作为第一级的key. 使用 foo[1] 的值作为第2级的key;
# 输出:
{ "z": { "x": { 3: "z", 4: "w" }}}
如果某函数运行比较消耗资源,
防止同时访问时启动多个函数的实例。
看如果没有用到也可以用iterate(..., timeout=99999999) 替代的话就去掉吧.
任务队列中有用到。task表中开始设计的consumer_id是个bigint类型,开发的时候想着这个字段保存consumer的ip方便debug,就这样做了一个转化
add dictutil.is_sub method
use a configuration like this:
{
"db": which database to sharding. a stirng.
"table": which table to sharding. a string.
"conn": {"host", "port", "user", "passwd"},
"shard_fields": those fileds to sharding by, a list or tuple.
"first_shard": first shard values in the table, a list or tuple.
"number_per_shard": how many rows of data a shard can contain and its tolerance. a list or tuple like (number, number).
"sharding_generator": a function that valid sharding format use a `shard` ad argument.
}
generate sharding info to a file like this:
{
"shard": sharding info, [(), (), ...],
"number": numbers of rows in every shard, [number, number, ...],
"all": number of all the rows in every shard. a number.
}
def argssplit( argstr ):
args = []
argstr = argstr.strip()
m = len(argstr)
i = 0
while( i < m ):
j = i
if argstr[j] in ('"',"'"):
i += 1
while( i < m and argstr[i] != argstr[j] ):
i += 1
args.append(argstr[j+1:i])
i += 1
else :
while( i < m and argstr[i] != ' ' ):
i += 1
args.append(argstr[j:i])
while( i < m and argstr[i] == ' ' ):
i += 1
return args
def break_line(str, width):
将1个字符串按照不超过指定的width的宽度增加换行, 只在空格处可以增加换行. 不允许拆分单词.
break_line('foo bar bar.', 9)
# foo bar
# bar
参考: s2: errlog.py: split_len
解决多线程+多进程时伪死锁的问题:
http://drmingdrmer.github.io/tech/programming/2017/11/20/python-concurrent-popen.html
合并到主干后要对所有集群升级, 安装subprocess32
根据yaml配置,校验参数
origin:
s2command.py rename -> command
s2sh.py rename -> shell
涉及到的几个函数:
上面这几个函数现在key_path
参数是dot分隔的string, 像: "foo.bar"
, 表示: my_dict["foo"]["bar"]
.
需要支持tuple/list格式的key_path
, 以支持更复杂的字典访问, 类似: ("foo", "bar")
表示: my_dict["foo"]["bar"]
mysqlutil.sql_dump_between_shards(shard_fileds, dbinfo, tablename, sql_path, bin_path, start, end=None)
# default
sql_dump_between_shards(
['bucket_id', 'scope', 'key'],
{
'host':'127.0.0.1',
'user':'root',
'password':'pw',
'port':3600,
'db':'mysql'
},
'key',
['tmp', 'key.sql'],
['mysqldump'],
(100000000, 'a', 'key_foo'),
(200000000, 'a', 'key_bar')
)
# " mysqldump --host=127.0.0.1 --port=3600 --user=user --password='pw' mysql key -w
# (`bucket_id`=100000000 AND `scope`='a' AND `key`>='key_foo') OR
# (`bucket_id`=100000000 AND `scope`>'a') OR
# (`bucket_id`>100000000 AND `bucket_id`<200000000) OR
# (`bucket_id`=200000000 AND `scope`<'a') OR
# (`bucket_id`=200000000 AND `scope`='a' AND `key`<'key_bar')
# > tmp/key.sql"
# demo.py
from pykit import shell
arguments = {
'echo_repr': (
lambda *x: sys.stdout.write(repr(x)),
{'x': {'nargs': '+', 'help': 'just an input message'}},
),
'foo': {
'bar': sys.stdout.write('bar'),
'bob': {
'plus': (
lambda x, y: sys.stdout.write(x + y),
{'x': {'nargs': 1, 'type'=int, help='an int is needed'}},
{'y': {'nargs': 1, 'type'=int, help='an int is neeedd'}},
),
},
},
'__add_help__': {
('echo_repr',) : 'output what is input.',
('foo', 'bar',) : 'print a "bar".',
('foo', 'bob', 'plus') : 'do addition operation with 2 numbers.',
},
'__description__': 'this is an example command.',
}
shell.command(**arguments)
then you can get help message like:
$ python demo.py -h
---------------------------
usage: this is an example command. [-h] {echo_repr, foo bar, foo bob plus} ...
positional arguments:
{echo_repr, foo bar, foo bob plus} commands...
echo_repr output what is input.
foo bar print a "bar".
foo bob plus do addition operation with 2 numbers.
optional arguments:
-h, --help show this help message and exit
$ python demo.py foo bob plus -h
--------------------------
usage: this is an example command. foo bob plus [-h] x y
positional arguments:
x an int is needed
y an int is needed
optional arguments:
-h, --help show this help message and exit
合并时 直接修改dst.
others 必须都是dict或子类.
mode=("keep"|"replace"|"merge"|"forcemerge") 来指定others[i]的key已经存在dst中时的行为: keep保留dst的key. replace直接替换dst中的key. merge或forcemerge来递归的用others[i]中的key 合并覆盖 dst的key. 要求只有dict可以互相合并. 对forcemerge, 如果dst[key]不是字典直接用others[i][key] 替换. 对merge, 抛出异常.
def write_file(..., atomic=None)
实现参考s2 代码fs.py:
lock = threading.RLock()
def atomic_write_file( fn, fcont ):
with lock:
tmpfn = fn + "._tmp_." + str(os.getpid()) + str(hash(fcont))
fsutil.write_file(tmpfn, fcont, uid=conf.s2uid, gid=conf.s2gid)
os.rename( tmpfn, fn )
add doc of functions:
merge function: utf8str
and to_output_format
mysqlutil.sql_scan_index(table, result_fields, index_fields, start, left_open=False, limit=1024, index_name=None)
# default
sql_scan_index("foo", ['_id', 'key'], ['key', 'val'], ["a", "b"])
# "SELECT `_id`, `key` FROM `foo` FORCE INDEX (`idx_key_val`) WHERE `foo`.`key` = "a" AND `foo`.`val` >= "b" LIMIT 1024";
# only one index specified
sql_scan_index("foo", ['_id', 'key'], ['key', 'val'], ["a"])
# "SELECT `_id`, `key` FROM `foo` FORCE INDEX (`idx_key_val`) WHERE `foo`.`key` >= "a" LIMIT 1024";
# specify index name: index_name="bar"
sql_scan_index("foo", ['_id', 'key'], ['key', 'val'], ["a"], index_name="bar")
# "SELECT `_id`, `key` FROM `foo` FORCE INDEX (`bar`) WHERE `foo`.`key` >= "a" LIMIT 1024";
# left open: do not include the first row: left_open=True
sql_scan_index("foo", ['_id', 'key'], ['key', 'val'], ["a"], index_name="bar", left_open=True)
# "SELECT `_id`, `key` FROM `foo` FORCE INDEX (`bar`) WHERE `foo`.`key` > "a" LIMIT 1024";
# specify database: ("mydb")
sql_scan_index(("mydb","foo"), ['_id', 'key'], ['key', 'val'], ["a"], index_name="bar", left_open=True)
# "SELECT `_id`, `key` FROM `mydb`.`foo` FORCE INDEX (`bar`) WHERE `mydb`.`foo`.`key` > "a" LIMIT 1024";
接口类似 sql_scan_index(). 返回一个generator.
用来扫描一张表所有数据.
def calc_checksum(path, sha1=True, md5=True, crc32=True, block_size=32*1024**2, iolimit=32*1024**2)
计算本地磁盘上文件的sha1, md5, 或crc32(1个或多个).
可以指定1次文件读取的块大小(默认32MB), 也可以指定io开销的限制(默认1秒32MB).
参考: s2 项目中 fs.py: get_file_checksums()
a = RangeSet([[1,7]])
b = RangeSet([[2,4]])
a.substract(b) # [[1,2], [4,7]]
a.intersect(b) # [[2,4]]
a.union(b) # [[1,7]]
RangeSet([[1,2],[5,7]], ).union([[3,6]]) # [[1,2], [3,7]]
如果网上有现成实现可以拿来用. 没有找到的话需要添加这个功能.
目前在s2/代码中group 的迁移中需要用到记录所有已经操作过的group_id的集合. mysql-devops项目中需要用来计算提交的事务的id的集合.
def makedirs(*paths, **kwargs):
mode = kwargs.get('mode', 0755)
uid = kwargs.get('uid')
gid = kwargs.get('uid')
gid = kwargs.get('uid'), here should be 'gid'?
def write_file(path, fcont, uid=None, gid=None, atomic=False):
--
if not atomic:
return _write_file(path, fcont, uid, gid)
tmp_path = '{path}._tmp_.{pid}_{timestamp}'.format(
path=path,
pid=os.getpid(),
timestamp=timeutil.ns(),
)
_write_file(tmp_path, fcont, uid, gid)
os.rename(tmp_path, path)
mysqlutil.sql_condition_between_shards(index_fields, shard, next_shard)
# default
sql_condition_between_shards([bucket_id, scope, key], (100000000, 'a', 'key_foo'), (200000000, 'a', 'key_bar'))
# ["`bucket_id`=100000000 AND `scope`='a' AND `key`>='key_foo'",
# "`bucket_id`=100000000 AND `scope`>'a'",
# "`bucket_id`>100000000 AND `bucket_id`<200000000",
# "`bucket_id`=200000000 AND `scope`<'a'",
# "`bucket_id`=200000000 AND `scope`='a' AND `key`<'key_bar'",]
# first index_field equals
sql_condition_between_shards([bucket_id, scope, key], (100000000, 'a', 'key_foo'), (100000000, 'b', 'key_bar'))
# ["`bucket_id`=100000000 AND `scope`='a' AND `key` >= 'key_foo'",
# "`bucket_id`=100000000 AND `scope`>'a' AND `scope`<'b'",
# "`bucket_id`=100000000 AND `scope` = 'b' AND `key` < 'key_bar'",]
# the first two index_field equals
sql_condition_between_shards([bucket_id, scope, key], (100000000, 'a', 'key_foo'), (100000000, 'a', 'key_bar'))
# ["`bucket_id`=100000000 AND `scope`='a' AND `key` >= 'key_foo' AND `key` < 'key_bar'"]
用来将上次记录的读取文件的位置信息清空.
fs.py add file handle api
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.