import re
import cloudpickle
import hashlib
import h5io
import h5py
import numpy as np
import queue
import os
import time
import queue
from threading import Thread
from concurrent.futures import Future, Executor
from pympipool.shared import cancel_items_in_queue
def get_hash(binary):
# Remove specification of jupyter kernel from hash to be deterministic
binary_no_ipykernel = re.sub(b"(?<=/ipykernel_)(.*)(?=/)", b"", binary)
return str(hashlib.md5(binary_no_ipykernel).hexdigest())
def serialize_funct_h5(fn, *args, **kwargs):
binary_funct = cloudpickle.dumps(fn)
binary_all = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs})
task_key = fn.__name__ + get_hash(binary=binary_all)
data = {"fn": binary_funct, "args": args, "kwargs": kwargs}
return task_key, data
def apply_funct(apply_dict):
return apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
def get_result_h5(task_key):
file_name = task_key + ".h5"
with h5py.File(file_name, "r") as hdf:
if "output" in hdf:
return h5io.read_hdf5(fname=hdf, title="output", slash="ignore")
elif "input_args" in hdf and "input_kwargs" in hdf:
result = apply_funct(
apply_dict={
"fn": cloudpickle.loads(
h5io.read_hdf5(fname=hdf, title="function", slash="ignore")
),
"args": h5io.read_hdf5(
fname=hdf, title="input_args", slash="ignore"
),
"kwargs": h5io.read_hdf5(
fname=hdf, title="input_kwargs", slash="ignore"
),
}
)
elif "input_args" in hdf:
result = apply_funct(
apply_dict={
"fn": cloudpickle.loads(
h5io.read_hdf5(fname=hdf, title="function", slash="ignore")
),
"args": h5io.read_hdf5(
fname=hdf, title="input_args", slash="ignore"
),
"kwargs": {},
}
)
elif "input_kwargs" in hdf:
result = apply_funct(
apply_dict={
"fn": cloudpickle.loads(
h5io.read_hdf5(fname=hdf, title="function", slash="ignore")
),
"args": [],
"kwargs": h5io.read_hdf5(
fname=hdf, title="input_kwargs", slash="ignore"
),
}
)
else:
raise TypeError
write_to_h5_file(task_key=task_key, data_dict={"output": result})
return result
def write_to_h5_file(task_key, data_dict):
file_name = task_key + ".h5"
with h5py.File(file_name, "a") as fname:
for data_key, data_value in data_dict.items():
if data_key == "fn":
h5io.write_hdf5(
fname=fname,
data=np.void(data_value),
overwrite="update",
title="function",
)
elif data_key == "args":
h5io.write_hdf5(
fname=fname,
data=data_value,
overwrite="update",
title="input_args",
slash="ignore",
)
elif data_key == "kwargs":
for k, v in data_value.items():
h5io.write_hdf5(
fname=fname,
data=v,
overwrite="update",
title="input_kwargs/" + k,
slash="ignore",
)
elif data_key == "output":
h5io.write_hdf5(
fname=fname,
data=data_value,
overwrite="update",
title="output",
slash="ignore",
)
def execute_tasks_h5(future_queue):
while True:
task_dict = None
try:
task_dict = future_queue.get_nowait()
except queue.Empty:
pass
if (
task_dict is not None
and "shutdown" in task_dict.keys()
and task_dict["shutdown"]
):
break
elif task_dict is not None:
key = list(task_dict.keys())[0]
future = task_dict[key]
if not future.done() and future.set_running_or_notify_cancel():
future.set_result(get_result_h5(task_key=key))
def reload_previous_futures(future_dict, task_queue):
for f in os.listdir():
if f.endswith(".h5"):
key = f.split(".h5")[0]
task_queue.put({key: future_dict[key]})
class FileExecutor(Executor):
def __init__(self):
self._task_queue = queue.Queue()
self._memory_dict = {}
reload_previous_futures(
future_dict=self._memory_dict, task_queue=self._task_queue
)
self._process = Thread(target=execute_tasks_h5, args=(self._task_queue,))
self._process.start()
def submit(self, fn, *args, **kwargs):
task_key, data_dict = serialize_funct_h5(fn, *args, **kwargs)
if task_key not in self._memory_dict.keys():
write_to_h5_file(task_key=task_key, data_dict=data_dict)
self._memory_dict[task_key] = Future()
self._task_queue.put({task_key: self._memory_dict[task_key]})
return self._memory_dict[task_key]
def shutdown(self, wait=True, *, cancel_futures=False):
if cancel_futures:
cancel_items_in_queue(que=self._task_queue)
self._task_queue.put({"shutdown": True, "wait": wait})
self._process.join()