amphp / byte-stream Goto Github PK
View Code? Open in Web Editor NEWA non-blocking stream abstraction for PHP based on Amp.
Home Page: https://amphp.org/byte-stream
License: MIT License
A non-blocking stream abstraction for PHP based on Amp.
Home Page: https://amphp.org/byte-stream
License: MIT License
Very large chunks, e.g. 1 GB chunks, lead to performance issues due to many small string ops and new buffer allocations. I have a patch locally already.
php://temp
and php://memory
streams result in 100% CPU usage. It's probably due to them not being actual streams that can be used with stream_select
, so there should probably be a test for that.
https://gist.github.com/umbri/2a3e558d487b5e6756768547e660f95a
<?php
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Loop;
require_once __DIR__ . "/../vendor/autoload.php";
Loop::run(function () {
$memory = fopen("php://temp", "r+");
$stdin = new ResourceInputStream($memory);
$stdout = new ResourceOutputStream($memory);
Loop::delay(1000, function () use ($stdout) {
echo "write" . PHP_EOL;
$stdout->write("Hello, World!");
});
while (($chunk = yield $stdin->read()) !== null) {
echo "data -> $chunk" . PHP_EOL;
}
});
Since a4739c8, following hangs with no output
<?php
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Loop;
require __DIR__ . "/../vendor/autoload.php";
$writeCoRoutine2 = \Amp\asyncCoroutine(function() {
$middleReadStream = new ResourceInputStream(fopen('middle', 'r'));
while (true) {
echo yield $middleReadStream->read();
}
});
Loop::run(function () use ($writeCoRoutine2) {
\Amp\ByteStream\pipe(
new ResourceInputStream(fopen('/home/g9735/Downloads/somefile.avi', 'r')),
new ResourceOutputStream(fopen('middle', 'w'))
);
$writeCoRoutine2();
});
as soon as read()
in $writeCoRoutine2 is executed, event loop will never return to pipe
kelunik@kelunik ❯ ~/GitHub/amphp/artax ❯ 10:24:18 ❯ master
$ php examples/1-get-request.php
HTTP/1.1 200 OK
{
"user-agent": "Mozilla/5.0 (compatible; Artax)"
}
PHP Warning: feof(): supplied resource is not a valid stream resource in /home/kelunik/GitHub/amphp/artax/vendor/amphp/byte-stream/lib/ResourceInputStream.php on line 60
I guess we can just suppress that one?
I've just released my application built on amphp at https://github.com/ostrolucky/stdinho Just before announcing it to world, I updated my dependencies. And it no longer works. Latest amphp/byte-stream it works with reliably is 1.2.2. 1.2.3 blocks in a way socket isn't created, 1.2.4 crashes with following error.
In ChannelParser.php line 61:
[Amp\Parallel\Sync\ChannelException]
Invalid packet received: \xce\xb3\x1a\x1b\x1\xe1\xff\x9d\xf8]D\xfb\xbc\xf4\
xa8\xb6\xd8\xd6oH\x18\xff\x9a\xa\x98\xb3\xc9\x5t \x7f\x4\xed\xf\xa8\xbf\xc3
?\x86{\xae\xd1\xd5\xbep\xa\xcc\xcbEIM;\xe9\x0\x95\xfb\x86\xebg\x3RwZ\x13\xa
6)Q|\x82y0\xa9\x86`\xc0\xd\x13\x82\xda\x99d\xcb\x9af\x9b\x81\xf1\xch\xd1\xa
e?\x3\xd0\x94&\\x1dVY\x3\xe1\xfc\xecK\x1dow\x15\xa1\x8a\x89n\xbc\x95t\xa6\x
f50\xf5";s:43:"\x0Amp\Parallel\Worker\Internal\TaskResult\x0id";s:2:"km";}
Exception trace:
Amp\Parallel\Sync\ChannelParser::parser() at n/a:n/a
Generator->send() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/parser/lib/Parser.php:102
Amp\Parser\Parser->push() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/parallel/lib/Sync/ChannelledStream.php:71
Amp\Parallel\Sync\ChannelledStream->Amp\Parallel\Sync\{closure}() at n/a:n/a
Generator->send() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Coroutine.php:74
Amp\Coroutine->Amp\{closure}() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Internal/Placeholder.php:127
class@anonymous\/media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Deferred.php0x7f2c357fa2d3->resolve() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Deferred.php:41
Amp\Deferred->resolve() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/byte-stream/lib/ResourceInputStream.php:90
Amp\ByteStream\ResourceInputStream::Amp\ByteStream\{closure}() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Loop/NativeDriver.php:172
Amp\Loop\NativeDriver->selectStreams() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Loop/NativeDriver.php:68
Amp\Loop\NativeDriver->dispatch() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Loop/Driver.php:130
Amp\Loop\Driver->tick() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Loop/Driver.php:70
Amp\Loop\Driver->run() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Loop.php:76
Amp\Loop::run() at /media/gadelat/sdata/GDrive/src/php/stdinho/bin/stdinho:50
Closure->{closure}() at n/a:n/a
call_user_func() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/symfony/console/Command/Command.php:250
Symfony\Component\Console\Command\Command->run() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/symfony/console/Application.php:865
Symfony\Component\Console\Application->doRunCommand() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/symfony/console/Application.php:241
Symfony\Component\Console\Application->doRun() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/symfony/console/Application.php:143
Symfony\Component\Console\Application->run() at /media/gadelat/sdata/GDrive/src/php/stdinho/bin/stdinho:56
Codebase is simple, please clone it and try it. I have no idea where could be a problem. It can be reproduced by running following:
#terminal 1
$ head -c 1G </dev/urandom | bin/stdinho 127.0.0.1:1337 -v
#terminal 2
$ curl localhost:1337 > /dev/null
GzipInputStream
already exists.
Hi.
It would be great to have possibility use custom delimiters in LineReader, e.g.
final class LineReader
{
...
public function __construct(InputStream $inputStream, $delimiter = "\n")
...
}
$reader = new \Amp\ByteStream\LineReader($stream, "\x0a\x00");
ResourceOutpurStream throw StreamException instead of ClosedException when resource is close
From docs:
If I understand correct: this code is used when buffer are NOT full and if buffers are full this code is used to schedule the writes, and this code are writing them
fwrite()
Currently we resolve promises from write
to the length written. I think we can remove that. That'd allow an optimization like using a singleton NullSuccess
object.
byte-stream/lib/ResourceOutputStream.php
Line 70 in 6d6c89f
I currently debugged an issue in IpcLogger of aerys. It hang with 100% CPU as the socket was not detected as dead. In my setup @fwrite does NOT populate error_get_last for some reason. Furthermore it also was not populated if the registered error handler returned 'true'.
The only really reliable solution (in IpcLogger) was like that:
$eh = set_error_handler([$this, 'onDeadIpcSock']);
$bytes = fwrite($this->ipcSock, $this->writeBuffer);
set_error_handler($eh);
If it affects that code then I am pretty sure it also affects the byte-stream code, right?
Not sure if this is a bug in PHP because in other places, the @ / error_reporting(0) DID populate the error_get_last
Any other idea to catch fwrite errors in a better way?
Reading after the stream has closed should always resolve to null
instead of throwing an exception.
I am using the ReadableIterableStream and it is awesome, but I am reading a very big list of large things. I am reading as quickly as the data comes in, but I'm still getting errors when I exceed the buffer size.
I can make the buffer bigger (and bigger and bigger -- 300MB now but will likely exceed 1GB) but chewing up all that ram for data I've already read is a waste and seems to impair performance.
ConcurrentQueueIterator
could delete data as it's read pretty easily, it looks like to me; FiberLocal
supports that operation already. There's nothing in the call chain that suggests to me that this option is already available, however.
In the most common use cases, once data is read it is no longer needed in the buffer - it has been buffed. A way to prevent this buildup would be welcome.
Typically I read from stream like this while (($data = yield $stream->read()) !== null)
and if I have readed null
stream is closed.
How can I know/find that stream is closed without read from it ?
How can I throttle stream ?
P.S: I want to implement something like pause()
, resume()
that will permit throttling, very useful for testing
Installing this library (as a dependency of amphp/parallel
) through composer results in a broken symlink in the docs directory:
$ ls -la vendor/amphp/byte-stream/docs
lrwxrwxrwx. 1 gen gen 13 Dec 27 15:28 asset -> .shared/asset
...
drwxrwxr-x. 2 gen gen 40 Oct 22 21:37 .shared
$ ls -la vendor/amphp/byte-stream/docs/.shared
total 0
drwxrwxr-x. 2 gen gen 40 Oct 22 21:37 .
drwxrwxr-x. 3 gen gen 300 Oct 22 21:37 ..
This broken link causes Gitlab CI to generate a warning while buliding a project:
No URL provided, cache will be not downloaded from shared cache server. Instead a local version of cache will be extracted.
WARNING: .../vendor/amphp/byte-stream/docs/asset: chmod .../vendor/amphp/byte-stream/docs/asset: no such file or directory (suppressing repeats)
(see a related Gitlab issue).
I hereby propose to drop Buffer
. As far as I'm aware there's not a single use of it in your libraries. It will be kept in the history and can be restored if need arises.
It can stay if somebody is willing to write tests for it, but I'm currently not, and I don't want to have it here without any tests.
I'm dealing with situation that following line
byte-stream/lib/ResourceOutputStream.php
Line 95 in 6bbfcb6
I suggest to create custom exception which carries $data
it was not succeeded to write.
I see this in my logs:
Amp\ByteStream\StreamException
Failed to write to stream after multiple attempts; fwrite(): send of 7302 bytes failed with errno=32 Broken pipe
This happens like every few seconds, I can try to provide a reproducible script, but that may take some time (currently on a business trip).
But I thought maybe you have an idea what to check.
There's no README currently.
ResourceInputStream
probably shouldn't null
the $resource
on __destruct()
. I think this would have prevented amphp/artax@v3.0.10^^...v3.0.10 and maybe similar bugs.
Hi,
I think the url should be written with https;// instead of http://.
Line 3 in 77decf1
Regards
$cancellation is not passed to split(). Apparently, this is not how it should be.
/**
* Splits the stream into lines.
*
* @return \Traversable<int, string>
*/
function splitLines(ReadableStream $source, ?Cancellation $cancellation = null): \Traversable
{
foreach (split($source, "\n") as $line) {
yield \rtrim($line, "\r");
}
}
byte-stream/lib/ResourceInputStream.php
Line 121 in 22113f2
In some rare cases if caller code doesn't check for null this may cause infinite loop and amp loop will not work.
Need to use immediateWatcher
instead. This may also affects output stream.
Also see related ticket:
amphp/ssh#16
We're currently at only 36%, that's way too low. This is an issue that should stay open as long as we're not on an acceptable level.
Exceptions are hidden inside Message
when streaming, they're fine when using the Promise
API.
I would like to change the chunksize
during Resouce(Input|Output)Stream
object lifecycle, not just when constructing it. Can we have chunksize mutator, or optional argument in read
?
It's clearly an Error, as it only occurs if you explicitly closed a stream and then still try to operate on it.
(Please fix in the next major to be released.)
Here is a problem which occurs if the script is run from systemd
Amp\Loop::run(function() {
$resourceOutputStream = new ByteStream\ResourceOutputStream(\STDOUT);
// Throws Amp\ByteStream\StreamException: The stream was closed by the peer
yield $resourceOutputStream->write('test');
});
I figured out that it stops happening if feof
check of the stream is removed in ResourceOutputStream.php:74
. Apparently it is not correct to use feof
for checking if stream is open or not, but I am not sure what can be done here as it has been added for a reason.
Maybe a fully working Resp (Redis protocol) parser?
I encountered hanging feof
resulting in StreamException("The stream was closed by the peer")
.
This happens when doing requests in Kubernetes to Facebook.
Omitting the feof check "fixes" it.
A workaround (and better code) would be to not check on @feof
before the write but setting an error handler for the fwrite
and cathing warnings and converting them to stream exceptions.
Example:
$eh = set_error_handler([$this, 'onSocketWriteError']);
if ($chunkSize) {
$written = \fwrite($stream, $data, $chunkSize);
} else {
$written = \fwrite($stream, $data);
}
set_error_handler($eh);
I needed to combine output of two input streams while not leaking null
from previous stream. I abandoned this idea after all, but code is already written and wonder if you would be interested in me to contribute it here? Usage is like following.
public function testInputStreamChain()
{
$chain = new InputStreamChain(new InMemoryStream('foo'), new InMemoryStream(null), new InMemoryStream('bar'));
self::assertEquals('foo', wait($chain->read()));
self::assertEquals(null, wait($chain->read()));
self::assertEquals('bar', wait($chain->read()));
}
Hi.
I think it's would be great to have possibility to read chunks with specified length since commonly dealing with binary protocols looks like
$lengthBytes = fread($fd, 4); // read Int32 Length
$length = unpack('Llength', $lengthBytes)['length'];
$packet = fread($fd, $length);
Not sure about possibility to make InputStream behave like public function read(?int $length = null): Promise;
, but maybe some buffering class similar to LineReader
would be useful, for example:
class LengthReader
{
public function __construct(InputStream $inputStream);
public function readLine(int $length): Promise;
}
After last commit, I get strange errors:
Amp\ByteStream\StreamException: Failed to write to socket Errno: 2; stream_socket_accept(): accept failed: Connection timed out
and
Amp\ByteStream\StreamException: Failed to write to socket Errno: 2; inet_pton(): Unrecognized address 127.0.0.1:11001
after some research i find that problems are from this:
https://github.com/amphp/byte-stream/blob/master/lib/ResourceOutputStream.php#L71
if I call it without $chunkSize
all is ok!
is this problem from PHP ?
I am using this:
PHP 7.1.3 (cli) (built: Mar 17 2017 13:43:06) ( NTS )
Only I get this bug ?
Can someone confirm this ?
I use Ridge V2 for RabbitMQ.
When I upgrade the package from version 12 to version v2.0.0-beta.14, message processing on the production server is greatly reduced (see point 1 in the figure). When I return version v2.0.0-beta.12 (see point 2 in the figure), the processing speed returns to the required value.
When file descriptors are shared between processes or threads, stream_socket_shutdown
will close the file descriptor for ALL threads/processes (took me forever to figure out what was going on) and cause broken pipes.
The ResourceInputStream
implementation is buggy, which results in InvalidWatcherError
s being thrown in amphp/http-client
.
AMP_DEBUG_TRACE_WATCHERS=true ./vendor/bin/phpunit test/ClientHttpBinIntegrationTest.php
PHPUnit 8.4.1 by Sebastian Bergmann and contributors.
.................................E 34 / 34 (100%)
Time: 31.25 seconds, Memory: 65.84 MB
There was 1 error:
1) Amp\Http\Client\ClientHttpBinIntegrationTest::testConcurrentSlowNetworkInterceptor
Amp\Loop\InvalidWatcherError: Cannot reference an invalid watcher identifier: 'o'
Creation Trace:
#0 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:183 Amp\Loop\TracingDriver->onReadable()
#1 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/byte-stream/lib/ResourceInputStream.php:102 Amp\Loop::onReadable()
#2 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/socket/src/ResourceSocket.php:72 Amp\ByteStream\ResourceInputStream->__construct()
#3 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/socket/src/ResourceSocket.php:40 Amp\Socket\ResourceSocket->__construct()
#4 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/socket/src/DnsConnector.php:122 Amp\Socket\ResourceSocket::fromClientSocket()
#5 Amp\Socket\DnsConnector::Amp\Socket\{closure}()
#6 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Coroutine.php:105 Generator->send()
#7 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:43 Amp\Coroutine->Amp\{closure}()
#8 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/PrivatePromise.php:23 class@anonymous/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php0x7fe1ece202d7->onResolve()
#9 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:125 Amp\Internal\PrivatePromise->onResolve()
#10 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php:45 class@anonymous/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php0x7fe1ece202d7->resolve()
#11 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/functions.php:233 Amp\Deferred->resolve()
#12 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:130 Amp\Promise\{closure}()
#13 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php:45 class@anonymous/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php0x7fe1ece202d7->resolve()
#14 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/NativeDriver.php:229 Amp\Deferred->resolve()
#15 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/NativeDriver.php:97 Amp\Loop\NativeDriver->selectStreams()
#16 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:134 Amp\Loop\NativeDriver->dispatch()
#17 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:72 Amp\Loop\Driver->tick()
#18 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/TracingDriver.php:22 Amp\Loop\Driver->run()
#19 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:84 Amp\Loop\TracingDriver->run()
#20 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:64 Amp\Loop::run()
#21 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:1400 Amp\PHPUnit\AsyncTestCase->runAsyncTest()
#22 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:36 PHPUnit\Framework\TestCase->runTest()
#23 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:1020 Amp\PHPUnit\AsyncTestCase->runTest()
#24 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestResult.php:691 PHPUnit\Framework\TestCase->runBare()
#25 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:752 PHPUnit\Framework\TestResult->run()
#26 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestSuite.php:569 PHPUnit\Framework\TestCase->run()
#27 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/TestRunner.php:616 PHPUnit\Framework\TestSuite->run()
#28 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/Command.php:200 PHPUnit\TextUI\TestRunner->doRun()
#29 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/Command.php:159 PHPUnit\TextUI\Command->run()
#30 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/phpunit:61 PHPUnit\TextUI\Command::main()
Cancel Trace:
#0 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:294 Amp\Loop\TracingDriver->cancel()
#1 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/byte-stream/lib/ResourceInputStream.php:141 Amp\Loop::cancel()
#2 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/socket/src/ResourceSocket.php:140 Amp\ByteStream\ResourceInputStream->read()
#3 /home/kelunik/GitHub/amphp/http-client/src/Connection/Http2Connection.php:469 Amp\Socket\ResourceSocket->read()
#4 Amp\Http\Client\Connection\Http2Connection->run()
#5 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Coroutine.php:105 Generator->send()
#6 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:130 Amp\Coroutine->Amp\{closure}()
#7 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php:45 class@anonymous/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php0x7fe1ece202d7->resolve()
#8 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/byte-stream/lib/ResourceInputStream.php:101 Amp\Deferred->resolve()
#9 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/NativeDriver.php:198 Amp\ByteStream\ResourceInputStream::Amp\ByteStream\{closure}()
#10 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/NativeDriver.php:97 Amp\Loop\NativeDriver->selectStreams()
#11 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:134 Amp\Loop\NativeDriver->dispatch()
#12 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:72 Amp\Loop\Driver->tick()
#13 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/TracingDriver.php:22 Amp\Loop\Driver->run()
#14 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:84 Amp\Loop\TracingDriver->run()
#15 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:64 Amp\Loop::run()
#16 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:1400 Amp\PHPUnit\AsyncTestCase->runAsyncTest()
#17 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:36 PHPUnit\Framework\TestCase->runTest()
#18 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:1020 Amp\PHPUnit\AsyncTestCase->runTest()
#19 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestResult.php:691 PHPUnit\Framework\TestCase->runBare()
#20 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:752 PHPUnit\Framework\TestResult->run()
#21 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestSuite.php:569 PHPUnit\Framework\TestCase->run()
#22 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/TestRunner.php:616 PHPUnit\Framework\TestSuite->run()
#23 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/Command.php:200 PHPUnit\TextUI\TestRunner->doRun()
#24 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/Command.php:159 PHPUnit\TextUI\Command->run()
#25 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/phpunit:61 PHPUnit\TextUI\Command::main()
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/TracingDriver.php:132
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:311
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/byte-stream/lib/ResourceInputStream.php:225
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/socket/src/ResourceSocket.php:170
/home/kelunik/GitHub/amphp/http-client/src/Connection/Http2Connection.php:333
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Coroutine.php:60
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/functions.php:66
/home/kelunik/GitHub/amphp/http-client/src/Connection/Http2Connection.php:435
/home/kelunik/GitHub/amphp/http-client/src/Connection/HttpStream.php:49
/home/kelunik/GitHub/amphp/http-client/src/Internal/InterceptedStream.php:44
/home/kelunik/GitHub/amphp/http-client/src/Interceptor/ModifyRequest.php:31
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Coroutine.php:105
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:130
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Coroutine.php:110
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:130
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Delayed.php:22
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/TracingDriver.php:47
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/NativeDriver.php:121
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:134
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:72
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/TracingDriver.php:22
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:84
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:64
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:36
ERRORS!
Tests: 34, Assertions: 72, Errors: 1.
Hi,
Is it possible to have a duplex stream? A typical use case is sharing a stream and having one side reading from it, while the other is writing to it. Quite similar to Golang's channels.
When trying to do a simple HTTP request with HttpClient
, I invariable receive a DnsException
thrown by Amp\Dns\Rfc1035StubResolver:197
, which is part of a MultiReasonException
containing two such exceptions, whose previous exception is an AssertionError
from byte-stream\lib\ResourceInputStream.php:135
where an assertion containing the following message fails:
Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.
amphp/amp v2.5.2
amphp/beanstalk v0.3.2
amphp/byte-stream v1.8.1
amphp/cache v1.4.0
amphp/dns v1.2.3
amphp/hpack v3.1.0
amphp/http v1.6.3
amphp/http-client v4.5.5
amphp/http-client-cookies v1.1.0
amphp/parallel v1.4.0
amphp/parser v1.0.0
amphp/postgres v1.3.3
amphp/process v1.1.1
amphp/serialization v1.0.0
amphp/socket v1.1.3
amphp/sql v1.0.1
amphp/sql-common v1.1.2
amphp/sync v1.4.0
amphp/uri v0.1.4
amphp/windows-registry v0.3.3
I'm getting the following exception while having a look at amphp/socket#32:
PHP Fatal error: Uncaught Amp\Loop\InvalidWatcherError: Cannot enable an invalid watcher identifier: 'byb' in /home/kelunik/GitHub/amphp/socket/vendor/amphp/amp/lib/Loop/Driver.php:369
Stack trace:
#0 /home/kelunik/GitHub/amphp/socket/vendor/amphp/amp/lib/Loop.php(210): Amp\Loop\Driver->enable('byb')
#1 /home/kelunik/GitHub/amphp/socket/vendor/amphp/byte-stream/lib/ResourceOutputStream.php(188): Amp\Loop::enable('byb')
#2 /home/kelunik/GitHub/amphp/socket/vendor/amphp/byte-stream/lib/ResourceOutputStream.php(130): Amp\ByteStream\ResourceOutputStream->send('test', false)
#3 /home/kelunik/GitHub/amphp/socket/lib/Socket.php(67): Amp\ByteStream\ResourceOutputStream->write('test')
#4 /home/kelunik/GitHub/amphp/socket/test.php(35): Amp\Socket\Socket->write('test')
#5 /home/kelunik/GitHub/amphp/socket/vendor/amphp/amp/lib/Loop/NativeDriver.php(101): {closure}('byc', NULL)
#6 /home/kelunik/GitHub/amphp/socket/vendor/amphp/amp/lib/Loop/Driver.php(130): Amp\Loop\NativeDriver->dispatch(true)
#7 /home/kelunik/GitHub/amphp/socket/vendor/a in /home/kelunik/GitHub/amphp/socket/vendor/amphp/amp/lib/Loop/Driver.php on line 369
server.php
<?php
require __DIR__ . "/vendor/autoload.php";
use Amp\Socket\ServerSocket;
use function Amp\asyncCoroutine;
\Amp\Loop::run(function () {
\Amp\Loop::repeat(5 * 1000, function () {
echo "current RAM: " . round((memory_get_usage() / 1024 / 1024), 2) . "MB, pick RAM: " . round((memory_get_peak_usage() / 1024 / 1024), 2) . "MB\n";
});
$clientHandler = asyncCoroutine(function (ServerSocket $client) {
echo "client connected" . PHP_EOL;
while (($chunk = yield $client->read()) !== null) {
echo "client data -> " . strlen($chunk) . PHP_EOL;
}
echo "client disonnected" . PHP_EOL;
});
$server = \Amp\Socket\listen("127.0.0.1:12001");
while ($client = yield $server->accept()) {
// $clientHandler($client);
}
});
client.php
<?php
require __DIR__ . "/vendor/autoload.php";
\Amp\Loop::run(function () {
echo "Driver -> " . get_class(\Amp\Loop::get()) . "\n";
\Amp\Loop::repeat(5 * 1000, function () {
echo "current RAM: " . round((memory_get_usage() / 1024 / 1024), 2) . "MB, pick RAM: " . round((memory_get_peak_usage() / 1024 / 1024), 2) . "MB\n";
});
for ($i = 0; $i < 1000; $i++) {
client($i);
}
});
/**
* @param $i
*
* @return \Amp\Promise<\Amp\Socket\ClientSocket>
*/
function client($i)
{
return \Amp\call(function () use ($i) {
/**
* @var $client \Amp\Socket\ClientSocket
*/
$client = yield \Amp\Socket\connect("127.0.0.1:12001");
\Amp\Loop::repeat(1000, function () use ($client, $i) {
$data = "test";
echo "write $i data -> " . strlen($data) . PHP_EOL;
$client->write($data);
});
return $client;
});
}
A tight loop of reading and writing with data always being available and always writable, might result in a blocking loop. We should discuss measures to mitigate that, e.g. scheduling immediate write with a Loop::defer()
in case many of them happen in a tight loop.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.