hubo1016 / aiogrpc Goto Github PK
View Code? Open in Web Editor NEWasyncio wrapper for grpc.io
License: Apache License 2.0
asyncio wrapper for grpc.io
License: Apache License 2.0
Is there any plan to add "async with" support? It seems that there is no "aexit" method for channel objects
Are there any plans to support the channel interceptor APIs? Or should it just work out of the box.
To demonstrate the problem:
cancel-task-async-for-blocked
(gjcarneiro@f6391fb)I often (not always) get an exception:
Future exception was never retrieved
future: <Future finished exception=<_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Channel closed!"
debug_error_string = "{"created":"@1591798471.711772955","description":"Error received from peer ipv4:127.0.0.1:9901","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Channel closed!","grpc_status":1}"
>>
Traceback (most recent call last):
File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/gjc/projects/aiogrpc/aiogrpc/utils.py", line 126, in _next
return next(self._iterator)
File "/home/gjc/.virtualenvs/aiogrpc/lib/python3.8/site-packages/grpcio-1.30.0rc1-py3.8-linux-x86_64.egg/grpc/_channel.py", line 416, in __next__
return self._next()
File "/home/gjc/.virtualenvs/aiogrpc/lib/python3.8/site-packages/grpcio-1.30.0rc1-py3.8-linux-x86_64.egg/grpc/_channel.py", line 706, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Channel closed!"
debug_error_string = "{"created":"@1591798471.711772955","description":"Error received from peer ipv4:127.0.0.1:9901","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Channel closed!","grpc_status":1}"
It happens when:
Taks.cancel()
).Then I get asyncio complaining about "Future exception was never retrieved".
Just tested your code with my project.
I get an error like:
AttributeError: 'list_iterator' object has no attribute '__anext__'
I use a grpc stream and want to iterate on it.
Basically my code is easy:
async def get_events():
events = my_stream_method()
async for v in events:
print(v)
return v
async def main():
try:
events = await asyncio.wait_for(get_events(), timeout=30.0)
except asyncio.TimeoutError:
print('timeout!')
else:
print(events)
asyncio.run(main(), debug=True)
my_stream_method
return a grpc stream as defined in the proto as:
// DeliverResponse
message DeliverResponse {
oneof Type {
string status = 1;
}
}
rpc my_stream_method (stream common.Envelope) returns (stream DeliverResponse) {
}
Do you know what I'm doing wrong?
When I look a the type of events = my_stream_method()
I get a WrappedIterator
, If I understand correctly, doing async for v in events
will create a WrappedAsyncIterator
.
Unfortunately, it will fail on this line:
self._next_future = asyncio.ensure_future(self._async_iter.__anext__(), loop=self._loop)
telling me self._async_iter
is a list_iterator: /
self._async_iter
seems to have been computed thanks to self._next_future = self._loop.run_in_executor(self._stream_executor, self._next)
in the __anext__
method of the WrappedIterator
.
Thank you,
from aiogrpc import insecure_channel
import asyncio
from mystub import MyStub
channel = insecure_channel('ipv4:///127.0.0.1:8080')
mystub = MyStub(channel)
async test_call():
return await mystub.mymethod(...)
async test_call_stream():
async for v in mystub.my_stream_method(...):
this is an example in README, however may be missed a def
docstring?
Is it supposed to work with Python 3.8? I get the following error:
from asyncio.futures import CancelledError
ImportError: cannot import name 'CancelledError' from 'asyncio.futures' (/usr/lib/python3.8/asyncio/futures.py)
When running (in a clean venv):
pip install aiogrpc
python
>> import aiogrpc.utils
With the latest commits fixing the build for Python 3.8, I think it would be worth releasing 1.7 and deploying it to PyPi ๐.
ฮป python .\testclient.py
E1030 15:36:09.480000000 11252 src/core/ext/transport/chttp2/server/insecure/server_chttp2.c:38] {"created":"@1509348969.480000000","description":"No address added out of total 1 resolved","file":"src/core/ext/transport/chttp2/server/chttp2_server.c","file_line":245,"referenced_errors":[{"created":"@1509348969.480000000","description":"Failed to add port to server","file":"src/core/lib/iomgr/tcp_server_windows.c","file_line":510,"referenced_errors":[{"created":"@1509348969.480000000","description":"OS Error","file":"src/core/lib/iomgr/tcp_server_windows.c","file_line":199,"os_error":"Only one usage of each socket address (protocol/network address/port) is normally permitted.\r\n","syscall":"bind","wsa_error":10048}]}]}
.....ERROR:root:Exception iterating requests!
Traceback (most recent call last):
File "C:\ProgramData\Anaconda3\lib\site-packages\grpc\_channel.py", line 185, in consume_request_iterator
request = next(request_iterator)
File "C:\ProgramData\Anaconda3\lib\site-packages\aiogrpc\utils.py", line 223, in __next__
raise r
File "C:\ProgramData\Anaconda3\lib\site-packages\aiogrpc\utils.py", line 204, in _next
self._q.put((await nf, False))
File ".\testclient.py", line 119, in test_input2
raise ValueError('testerror')
ValueError: testerror
...
----------------------------------------------------------------------
Ran 8 tests in 1.314s
OK
When this package is used in other threads than main thread (ScriptRunner.scriptThread
in streamlit by example), it throws RuntimeError: There is no current event loop in thread
from Channel.py.
....
aiogrpc/channel.py", line 324, in __init__
loop = _asyncio.get_event_loop()
....
File "lib/python3.9/asyncio/events.py", line 642, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'ScriptRunner.scriptThread'.
I'm taking the following suggestion.(I was not able to find the original question and answer).
https://techoverflow.net/2020/10/01/how-to-fix-python-asyncio-runtimeerror-there-is-no-current-event-loop-in-thread/
in channel.py
if loop is None:
try:
loop = _asyncio.get_event_loop()
except RuntimeError as ex:
if "There is no current event loop in thread" in str(ex):
new_loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(new_loop)
loop = _asyncio.get_event_loop()
It works fine for me so far, but I wonder it is valid for all test case.
Hi,
How are you thinking about backwards compatibility to Python 3.5?
I have a project where I unfortunately have to support Python 3.5..
Cheers,
Matthias
do you have any plans to support the wait_for_ready
semantics recently added to the gRPC python API? https://github.com/grpc/grpc/tree/master/examples/python/wait_for_ready
It seems like it would just involve adding an optional wait_for_ready
param to the MultiCallable classes and using them when calling the future
method (https://github.com/grpc/grpc/blob/55bbf1cc1c66886392ff15ad31b65f7f8b8cf4fa/src/python/grpcio/grpc/_channel.py)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.