A program template for processing data concurrently, modules have used including Python3 built-in modules like multiprocessing, threading, asyncio, etc., a third-party modules aioprocessing.
This program will put the test data generated by a process to multiple message queues which will be received by multiple processes. These procsses will get data from their own message queue, process these data, and then put processed-data to a shared message queue(aioprocessing 1), another single process will get data from the shared message queue and do some thing like exportation.
The overview diagram of this project:
In order to make my explaination easier to understand, I abstract the architecture to the factory in our real world. The whole data processing process can be seen as the production process. I rename the object name to the real world name. Now I will start my explaination like company introduction in a roadshow.
The organization structure
Description of this departments and jobs:
- CEO:
- Deploy workshop number according to the resource(CPU cores).
- Create prouction line to every workshop(thread-safe synchronize message queue), create a transportation line(thread-safe asynchronous message queue).
- Start workshops, purchasing dept, operation dept, transportation dept.
- Purchasing Department:
- Buyer:
- Purchasing raw material and paste product id(pid) tag. Some different raw materials may belong to one product(have the same pid).
- Put the raw material to the prouction line it belongs to.
- Buyer:
- Production Department:
- workshop_0:
- Director: Every workshop has a director
- Manage Vice Director, Monitor.
- Get material from production line, assign it to some worker according to its 'pid' in worker roster.
- Sign the contract 2 with new worker, renew the contract with old worker if a new material belonging to he arrived.
- Vice Director:
- Occupy a half of the workshop resource(GIL) to manage workers and inspectors.
- Only allocate resource to workers with material.
- Worker:
- Every worker has a conveyor belt, they get material from the exclusive conveyor belt.
- According to the product blueprint to assembly the product.
- Paste the product class(pclass) tag and put it to transportation line.
- Every worker have a contract, every time get new material, director will renew the contract. If the worker don't get the new material before the contract expires, the worker will be fired.
- Inspector:
- One inspector inspect one worker every other time and do something.
- Monitor:
- Reoirt directly to the Director and be accountable to the Director.
- Check the worker roster every other time
- Director: Every workshop has a director
- workshop_1:
- ...
- workshop_n:
- workshop_0:
- Transportation dept:
- Dispatcher:
- Manage stevedore and driver.
- Manage warehouse.
- Stevedore:
- Get product from transportation line.
- Do something, such as pakaging(pandas).
- Put it to warehouse according to product class(pclass)
- Driver:
- Get all products from warehouse every other time.
- Dispatcher:
- Operation dept:
- Ops:
- Monitor all production lines every other time
- Ops:
# build image
>>> docker build -t test:1.0 .
# run the image as a container
>>> docker run -i -t -d -v $(pwd):/app --name test test:1.0
>>> docker exec -i -t test /bin/bash
>>> python3 ceo.py
This project only provide the 'factory', as for what 'product' to make you need create your own 'blueprint'.
I test this project on my MacBook Pro 16-inch, i7 2.6GHz. Deploied resource to the Docker of CPUs 8, 8G RAM. As we can see the project purchase 5000+ materials every 1 second, and give them to 5 workshop to make product. Every production line has no backlog. Every workshop's worker roster size stable at around 5000.
But if there is no limit of purching speed, it will lead to the quantity of raw materials far exceeds the actual production capacity. A large backlog of raw materials on the production line. Seriously reduce the performance of the production line. The workshop was unable to obtain sufficient raw materials, resulting in a significant drop in the company's production capacity.
So the best solution is to determine the number of workshops based on the number of resources(number of CPU cores), and then reasonably set the procurement speed, so that the raw material procurement speed matches the actual production capacity, and there is no backlog in the production line.
- Continuously optimize the code to improve the running speed
- Add type hints
- Speed up the 'factory' through some ways. For example, compile the python script file(.py) to binary file(.so).
Aioprocessing in this project is only used its AioJoinableQueue which is a thread-safe asynchronous queue to gather the output from multi processes. But because of being made of pure Python, the speed of AioJoinableQueue is not satisfactory. So it can be replaced in the future.
Contract mechanism is in order to immediately determine the cancelling of the order after a period of time and fire the worker. Save the resources.