Comments (6)
I'm not aware of any HTTP mechanisms to combine multiple independent requests into a single response via batching at the server side. With HTTP there will always be a single request that receives a corresponding response. There are things like keep-alive and pipelining that can happen under the hood to allow connection reuse and out-of-order reponses, but a request still receives its dedicated response.
from requests-futures.
Thank you @ross . Turns out the server was internally batching , but sending individual responses.
On the client side, I am appending the responses and using as_completed
. Is there a way get only the "new" responses since the last call of as_completed
or is the management of this left to the user?
from requests-futures.
as_completed
or is the management of this left to the user?
In general i think as_completed
is intended to be used as a generator, so called once and the results iterated over as they're returned:
for future in as_completed(futures):
# do something with each future as it's ready
pass
# by this point all of the futures have completed
from requests-futures.
The use-case I am using this is the case of streaming video, I am sending frames as they are read from the source in near-real time, the server is sending the responses, I am appending the responses, but I need to process the received responses in near real-time, so futures is always getting appended with new responses. I guess in my case, I need to manage futures, (keep track of what is already read and drop that response from futures) ?
from requests-futures.
Ah, you'll want to look t wait
, https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait, rather than as_completed
and use return_when=FIRST_COMPLETED
. I'd guess you'd probably want to use it in a thread, something like the following would probably work. Note this was quickly thrown together as I found the problem interesting and it's very much proof-of-concept and not fully complete/tested code:
#!/usr/bin/env python3
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from threading import Event, Thread
from random import uniform
from time import sleep
from logging import DEBUG, basicConfig, getLogger
from signal import SIGINT, signal
basicConfig(level=DEBUG)
def do_work(i, delay):
sleep(delay)
return i, delay
class Worker(Thread):
log = getLogger("Worker")
def __init__(self):
self.log.info("__init__")
super().__init__(name="Worker")
self.running = Event()
self.executor = ThreadPoolExecutor(max_workers=2)
self.futures = set()
self.i = 0
# start out 3 requests
self.request()
self.request()
self.request()
def request(self):
# this is just doing sleeps of random durations to fake like they're
# http requests since i don't want to hammer a web service somewhere
delay = uniform(1, 5)
self.log.info("request: delay=%0.2f", delay)
future = self.executor.submit(do_work, self.i, delay)
self.i += 1
self.futures.add(future)
def run(self):
self.log.info("run:")
while not self.running.is_set():
self.log.info("run: len(futures)=%d", len(self.futures))
if not self.futures:
# wait around for more work to do, this could be shorter to be
# more responsive at the cost of busy waiting
sleep(1)
continue
done, _ = wait(self.futures, timeout=1, return_when=FIRST_COMPLETED)
for future in done:
# remove this one from the set we're expecting
self.futures.remove(future)
# TODO: would need to error handle here
i, delayed = future.result()
self.log.info("run: future=%s, i=%d, delayed=%0.2f", future, i, delayed)
# start another request
self.request()
# we're abandoning anything inprogress here...
self.log.info("run: finished")
def stop(self):
self.running.set()
worker = Worker()
# handle ctrl-c
def handler(*args, **kwargs):
worker.stop()
signal(SIGINT, handler)
worker.start()
worker.join()
(env) coho:tmp ross$ ./consumer.py
INFO:Worker:__init__
INFO:Worker:request: delay=4.17
INFO:Worker:request: delay=2.29
INFO:Worker:request: delay=3.94
INFO:Worker:run:
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7e140 state=finished returned tuple>, i=1, delayed=2.29
INFO:Worker:request: delay=1.58
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7dde0 state=finished returned tuple>, i=0, delayed=4.17
INFO:Worker:request: delay=1.82
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7e5c0 state=finished returned tuple>, i=3, delayed=1.58
INFO:Worker:request: delay=2.36
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7e3e0 state=finished returned tuple>, i=2, delayed=3.94
INFO:Worker:request: delay=3.46
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7e140 state=finished returned tuple>, i=4, delayed=1.82
INFO:Worker:request: delay=2.70
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7dde0 state=finished returned tuple>, i=5, delayed=2.36
INFO:Worker:request: delay=2.48
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7e5c0 state=finished returned tuple>, i=6, delayed=3.46
INFO:Worker:request: delay=2.04
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7e3e0 state=finished returned tuple>, i=7, delayed=2.70
INFO:Worker:request: delay=1.12
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7dde0 state=finished returned tuple>, i=9, delayed=2.04
INFO:Worker:request: delay=2.17
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7e140 state=finished returned tuple>, i=8, delayed=2.48
INFO:Worker:request: delay=2.27
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7e5c0 state=finished returned tuple>, i=10, delayed=1.12
INFO:Worker:request: delay=3.91
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7e3e0 state=finished returned tuple>, i=11, delayed=2.17
INFO:Worker:request: delay=2.67
INFO:Worker:run: len(futures)=3
INFO:Worker:run: len(futures)=3
INFO:Worker:run: future=<Future at 0x103f7dde0 state=finished returned tuple>, i=12, delayed=2.27
INFO:Worker:request: delay=3.24
INFO:Worker:run: len(futures)=3
^CINFO:Worker:run: finished
from requests-futures.
Awesome . Thank you! Will try this out
from requests-futures.
Related Issues (20)
- OSError Issues when used within celery task (gevent) HOT 2
- Adding attribute to response via hook (as seen in examples) fails HOT 1
- Cannot switch to a different thread HOT 3
- Possibility to accept old SSL versions? HOT 1
- Problem with data field in header request HOT 1
- Making a future request and letting callback work with result - some are missing HOT 2
- Error When there is a large parallel Requests go. HOT 2
- Provide wheels on PyPI HOT 1
- How do we pass arguments to the hooks? HOT 3
- AttributeError: 'Future' object has no attribute 'status_code' HOT 1
- 1.0.0: missing git tag HOT 5
- Add type annotations HOT 4
- Not showing as updated on PyPI HOT 3
- On exception it returns xx (Caused By None), whereas the requests module returns xx (Caused By xx) HOT 4
- The elapsed time hook example does not work HOT 3
- 1.0.0: pytest is failing in two units HOT 6
- Use pytest-httpbin? HOT 8
- Attribute set via response hooks does not work with ProcessPoolExecutor HOT 4
- RFE: preserve contextvars when performing the request HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from requests-futures.