mymarilyn / aioch Goto Github PK
View Code? Open in Web Editor NEWaioch - is a library for accessing a ClickHouse database over native interface from the asyncio
License: Other
aioch - is a library for accessing a ClickHouse database over native interface from the asyncio
License: Other
Hi,
Let's say that I have 10 nodes clickhouse server.
Let's say that I have 50 queries that I would like to execute on each node's distributed table and get results (in pandas dataframe).
I have seen an example on https://docs.python.org/3/library/asyncio-queue.html (bottom of the page), but I don't know how to use aioch for the same purpose.
My situation is that I should have 50 queries (sleep_for), and 10 tasks.
As soon as one query is finished free server should take another query and execute it.
Has someone done something like that?
Regards.
About 10 months ago, clickhouse-driver
added a third argument to the Client.substitute_params
method. This update was not reflected in aioch
, and it's making all execute_iter()
and execute_with_progress()
calls fail with
TypeError: substitute_params() missing 1 required positional argument: 'context'
These were recently added convenience methods that I'd love to take advantage of: https://clickhouse-driver.readthedocs.io/en/latest/api.html#clickhouse_driver.Client.insert_dataframe
I would be up to implement this if the PR has a chance of being merged when completed!
Thanks
Is the async version of the driver being maintained?
clickhouse-client
which is great and is being updated and also recommended by many.
But, is this async wrapper being updated ? Does it need the updates?
Is DB API supported for aioch?
GCP - 24 CPU, 32 RAM.
Semaphore - just 4
Query is fast enough
Script starts running well but after some time I start getting this error:
Error on socket shutdown: [Errno 107] Transport endpoint is not connected
Before I was getting another error - OSError: [Errno 24] Too many open files
I fixed by raising limits - ulimit -n 100000
async def get_result(comb):
sem = asyncio.Semaphore(4)
async with sem:
client = Client('localhost', database='sna_gandalf')
num = await client.execute(
'select count(distinct id_follower)*20 from followers_women sample 0.05 where arrayExists(x -> x = id_blogger, ' + str(comb) + ') = 1 select sum(followers) from bloggers_tmp_price where arrayExists(x -> x = id, ' + str(comb) + ') = 1 select sum(money) from bloggers_tmp_price where arrayExists(x -> x = id, ' + str(comb) + ') = 1')
combinations_dict[str(comb)] = str(num[0][0]) + ', ' + str(num[1][0]) + ', ' + str(num[2][0])
combinations_dict = {}
for i in range(1, 8):
print('range ', i, ' done')
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(
[get_result(list(comb)) for comb in itertools.combinations(list(followers_df.id_blogger.unique()), i)]))
Google does not help me =(
python 3.8
aioch 0.0.2
AttributeError: 'ColumnOrientedBlock' object has no attribute 'rows'
for code from README.md examples (part with progress)
Line 27 in fb32065
may be block.rows
-> block.get_rows()
?
Hi,
I have runned your example with modified selects:
from datetime import datetime
import asyncio
from aioch import Client
async def exec_progress():
client = Client('localhost', port=2441)
progress = await client.execute_with_progress('select 1')
timeout = 20
started_at = datetime.now()
async for num_rows, total_rows in progress:
done = num_rows / total_rows if total_rows else total_rows
now = datetime.now()
# Cancel query if it takes more than 20 seconds to process 50% of rows.
if (now - started_at).total_seconds() > timeout and done < 0.5:
await client.cancel()
break
else:
rv = await progress.get_result()
print(rv)
async def exec_no_progress():
client = Client('localhost', port=2441)
rv = await client.execute('select 2')
print(rv)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([exec_progress(), exec_no_progress()]))
but I got an error:
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-1-a99ee321e210> in <module>
31
32 loop = asyncio.get_event_loop()
---> 33 loop.run_until_complete(asyncio.wait([exec_progress(), exec_no_progress()]))
~\Miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future)
569 future.add_done_callback(_run_until_complete_cb)
570 try:
--> 571 self.run_forever()
572 except:
573 if new_task and future.done() and not future.cancelled():
~\Miniconda3\lib\asyncio\base_events.py in run_forever(self)
524 self._check_closed()
525 if self.is_running():
--> 526 raise RuntimeError('This event loop is already running')
527 if events._get_running_loop() is not None:
528 raise RuntimeError(
RuntimeError: This event loop is already running
I don't get it what is wrong.
Regards.
Just interested if you have planned it.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.