Comments (3)
this is the version that works with ThreadPoolExecutor, but seems to block:
Here is the version that uses Thread Pooling
def read_sql(self, sql, params = None):
t1 = timeit.default_timer()
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
with psycopg.connect(self.connect_str, autocommit=True) as conn:
df = pd.read_sql(sql, con = conn, params = params)
self.log.debug(f'perf: {timeit.default_timer() - t1}')
return df
the concurrent futures code is this:
import concurrent.futures as cf
def test_thread_pool():
db_reader = DataReader()
sql = "select id, name from factor f where f.id = ANY(%s)"
threads = 20
id_partitions = np.array_split(list(range(1, 10000)), threads)
id_partitions = [[p.tolist()] for p in id_partitions]
with cf.ThreadPoolExecutor(max_workers=threads) as exec:
futures = {
exec.submit(db_reader.read_sql, sql, params=p):
p for p in id_partitions
}
for future in cf.as_completed(futures):
ids = futures[future]
try:
df = future.result()
except Exception as exc:
log.exception(f'error retrieving data for: {ids}')
else:
if df is not None:
print(f'shape: {df.shape}')
The output of the debug line from read_sql looks like this:
perf: 0.7313497869981802
perf: 0.8116309550023288
perf: 3.401154975006648
perf: 5.22201336100261
perf: 6.325166654998611
perf: 6.338692951001576
perf: 6.573095380997984
perf: 6.5976604809984565
perf: 6.8282670119951945
perf: 7.291718505999597
perf: 7.4276196580030955
perf: 7.407097272000101
perf: 8.38801568299823
perf: 9.119963648998237
You'll notice that it is incrementing - id have expected it to be all roughly around the same time - so it seems there is some sql blocking. Also, the time gap between the first two threads and 3rd is always about 2-3 seconds - why is that?
I've also tried creating a new DbReader instance for each thread..but same effect.
anyone know if the connection or pandas read_sql blocks? or how to solve?
from pandas.
I have not had enough time to look at this, but from the issue title, shouldn't you await
this to remove the warning:
tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}
Because db_reader.read_sql_async
is an async method.
from pandas.
I have not had enough time to look at this, but from the issue title, shouldn't you
await
this to remove the warning:tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}Because
db_reader.read_sql_async
is an async method.
I am. You can see the await in the for/as_completed loop.
I think the problem is prob an async connection or cusor doesn't work with pandas. If so, if like to understand if pandas blocks ..as per the 2nd example, why am I not able to achieve better concurrency
from pandas.
Related Issues (20)
- ADMIN: Upgrade to mailman 3
- ENH: Allow `pd.set_option` to control the uuid4 seed used in `pd.io.formats.style.Renderer`
- BUG: Passing an empty Index to pd.DataFrame.from_records() produces a ValueError
- BUG: Adding or multiplying a pandas nullable dtype Series with a pyarrow dtype Series raises TypeError HOT 3
- BUG: Regression of Index.join() when return_indexers=True
- BUG: Slicing operator has a regression on Python 3.12 for a dataframe with categorical MultiIndex HOT 4
- BUG: groupby.first() with None values fills None value with next row HOT 5
- BUG: ./ci/code_checks.sh fails in local dev environment HOT 1
- Default string dtype (PDEP-14): naming convention to distinguish the dtype variants HOT 27
- BUG: Renaming a dataframe columns with a series containing duplicated index corrupts the dataframe HOT 4
- BUG: pd.DataFrame.set_index() detects "label" input instead of "iterator" when keys argument is a list
- BUILD: 3rd party licenses are missing from pandas binary wheels distributed on pypi HOT 1
- BUG: `read_parquet` doesn't convert categories to `pd.CategoricalDtype` when `dtype_backend="pyarrow"`
- BUG: Unable to open Stata 118 or 119 files saved in big-endian format that contain strL data
- BUG: RangeIndex.searchsorted() when using a negative step HOT 3
- BUG: inconsistent behavior of pandas.api.types.pandas_dtype
- BUG: DataFrame.groupby returns invalid value when dropna=False HOT 2
- ENH: Add paramenter `index` to `drop_duplicates` to drop duplicate indices
- Potential regression induced by "BUG: Use large_string in string array consistently" HOT 1
- Mac (with M2 chip) install of pandas with Poetry HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pandas.