Coder Social home page Coder Social logo

aiofile's Introduction

AIOFile

Github Actions Latest Version https://coveralls.io/repos/github/mosquito/aiofile/badge.svg?branch=master

Real asynchronous file operations with asyncio support.

Status

Development - Stable

Features

  • Since version 2.0.0 using caio, which contains linux libaio and two thread-based implementations (c-based and pure-python).
  • AIOFile has no internal pointer. You should pass offset and chunk_size for each operation or use helpers (Reader or Writer). The simples way is to use async_open for creating object with file-like interface.
  • For Linux using implementation based on libaio.
  • For POSIX (MacOS X and optional Linux) using implementation based on threadpool.
  • Otherwise using pure-python thread-based implementation.
  • Implementation chooses automatically depending on system compatibility.

Limitations

  • Linux native AIO implementation is not able to open special files. Asynchronous operations against special fs like /proc/ /sys/ are not supported by the kernel. It's not a aiofile`s or `caio issue. In this cases, you might switch to thread-based implementations (see troubleshooting section). However, when used on supported file systems, the linux implementation has a smaller overhead and is preferred but it's not a silver bullet.

Code examples

All code examples requires python 3.6+.

High-level API

async_open helper

Helper mimics python file-like objects, it returns file-like objects with similar but async methods.

Supported methods:

  • async def read(length = -1) - reading chunk from file, when length is -1, will be reading file to the end.

  • async def write(data) - writing chunk to file

  • def seek(offset) - setting file pointer position

  • def tell() - returns current file pointer position

  • async def readline(size=-1, newline="\n") - read chunks until newline or EOF. Since version 3.7.0 __aiter__ returns LineReader.

    This method is suboptimal for small lines because it doesn't reuse read buffer. When you want to read file by lines please avoid using async_open use LineReader instead.

  • def __aiter__() -> LineReader - iterator over lines.

  • def iter_chunked(chunk_size: int = 32768) -> Reader - iterator over chunks.

  • .file property contains AIOFile object

Basic example:

import asyncio
from pathlib import Path
from tempfile import gettempdir

from aiofile import async_open

tmp_filename = Path(gettempdir()) / "hello.txt"

async def main():
    async with async_open(tmp_filename, 'w+') as afp:
        await afp.write("Hello ")
        await afp.write("world")
        afp.seek(0)

        print(await afp.read())

        await afp.write("Hello from\nasync world")
        print(await afp.readline())
        print(await afp.readline())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Example without context manager:

import asyncio
import atexit
import os
from tempfile import mktemp

from aiofile import async_open


TMP_NAME = mktemp()
atexit.register(os.unlink, TMP_NAME)


async def main():
    afp = await async_open(TMP_NAME, "w")
    await afp.write("Hello")
    await afp.close()


asyncio.run(main())
assert open(TMP_NAME, "r").read() == "Hello"

Concatenate example program (cat):

import asyncio
import sys
from argparse import ArgumentParser
from pathlib import Path

from aiofile import async_open

parser = ArgumentParser(
    description="Read files line by line using asynchronous io API"
)
parser.add_argument("file_name", nargs="+", type=Path)

async def main(arguments):
    for src in arguments.file_name:
        async with async_open(src, "r") as afp:
            async for line in afp:
                sys.stdout.write(line)


asyncio.run(main(parser.parse_args()))

Copy file example program (cp):

import asyncio
from argparse import ArgumentParser
from pathlib import Path

from aiofile import async_open

parser = ArgumentParser(
    description="Copying files using asynchronous io API"
)
parser.add_argument("source", type=Path)
parser.add_argument("dest", type=Path)
parser.add_argument("--chunk-size", type=int, default=65535)


async def main(arguments):
    async with async_open(arguments.source, "rb") as src, \
               async_open(arguments.dest, "wb") as dest:
        async for chunk in src.iter_chunked(arguments.chunk_size):
            await dest.write(chunk)


asyncio.run(main(parser.parse_args()))

Example with opening already open file pointer:

import asyncio
from typing import IO, Any
from aiofile import async_open


async def main(fp: IO[Any]):
    async with async_open(fp) as afp:
        await afp.write("Hello from\nasync world")
        print(await afp.readline())


with open("test.txt", "w+") as fp:
    asyncio.run(main(fp))

Linux native aio doesn't support reading and writing special files (e.g. procfs/sysfs/unix pipes/etc.), so you can perform operations with these files using compatible context objects.

import asyncio
from aiofile import async_open
from caio import thread_aio_asyncio
from contextlib import AsyncExitStack


async def main():
    async with AsyncExitStack() as stack:

        # Custom context should be reused
        ctx = await stack.enter_async_context(
            thread_aio_asyncio.AsyncioContext()
        )

        # Open special file with custom context
        src = await stack.enter_async_context(
            async_open("/proc/cpuinfo", "r", context=ctx)
        )

        # Open regular file with default context
        dest = await stack.enter_async_context(
            async_open("/tmp/cpuinfo", "w")
        )

        # Copying file content line by line
        async for line in src:
            await dest.write(line)


asyncio.run(main())

Low-level API

The AIOFile class is a low-level interface for asynchronous file operations, and the read and write methods accept an offset=0 in bytes at which the operation will be performed.

This allows you to do many independent IO operations on an once open file without moving the virtual carriage.

For example, you may make 10 concurrent HTTP requests by specifying the Range header, and asynchronously write one opened file, while the offsets must either be calculated manually, or use 10 instances of Writer with specified initial offsets.

In order to provide sequential reading and writing, there is Writer, Reader and LineReader. Keep in mind async_open is not the same as AIOFile, it provides a similar interface for file operations, it simulates methods like read or write as it is implemented in the built-in open.

import asyncio
from aiofile import AIOFile


async def main():
    async with AIOFile("hello.txt", 'w+') as afp:
        payload = "Hello world\n"

        await asyncio.gather(
            *[afp.write(payload, offset=i * len(payload)) for i in range(10)]
        )

        await afp.fsync()

        assert await afp.read(len(payload) * 10) == payload * 10

asyncio.run(main())

The Low-level API in fact is just little bit sugared caio API.

import asyncio
from aiofile import AIOFile


async def main():
    async with AIOFile("/tmp/hello.txt", 'w+') as afp:
        await afp.write("Hello ")
        await afp.write("world", offset=7)
        await afp.fsync()

        print(await afp.read())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Reader and Writer

When you want to read or write file linearly following example might be helpful.

import asyncio
from aiofile import AIOFile, Reader, Writer


async def main():
    async with AIOFile("/tmp/hello.txt", 'w+') as afp:
        writer = Writer(afp)
        reader = Reader(afp, chunk_size=8)

        await writer("Hello")
        await writer(" ")
        await writer("World")
        await afp.fsync()

        async for chunk in reader:
            print(chunk)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

LineReader - read file line by line

LineReader is a helper that is very effective when you want to read a file linearly and line by line.

It contains a buffer and will read the fragments of the file chunk by chunk into the buffer, where it will try to find lines.

The default chunk size is 4KB.

import asyncio
from aiofile import AIOFile, LineReader, Writer


async def main():
    async with AIOFile("/tmp/hello.txt", 'w+') as afp:
        writer = Writer(afp)

        await writer("Hello")
        await writer(" ")
        await writer("World")
        await writer("\n")
        await writer("\n")
        await writer("From async world")
        await afp.fsync()

        async for line in LineReader(afp):
            print(line)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

When you want to read file by lines please avoid to use async_open use LineReader instead.

More examples

Useful examples with aiofile

Async CSV Dict Reader

import asyncio
import io
from csv import DictReader

from aiofile import AIOFile, LineReader


class AsyncDictReader:
    def __init__(self, afp, **kwargs):
        self.buffer = io.BytesIO()
        self.file_reader = LineReader(
            afp, line_sep=kwargs.pop('line_sep', '\n'),
            chunk_size=kwargs.pop('chunk_size', 4096),
            offset=kwargs.pop('offset', 0),
        )
        self.reader = DictReader(
            io.TextIOWrapper(
                self.buffer,
                encoding=kwargs.pop('encoding', 'utf-8'),
                errors=kwargs.pop('errors', 'replace'),
            ), **kwargs,
        )
        self.line_num = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.line_num == 0:
            header = await self.file_reader.readline()
            self.buffer.write(header)

        line = await self.file_reader.readline()

        if not line:
            raise StopAsyncIteration

        self.buffer.write(line)
        self.buffer.seek(0)

        try:
            result = next(self.reader)
        except StopIteration as e:
            raise StopAsyncIteration from e

        self.buffer.seek(0)
        self.buffer.truncate(0)
        self.line_num = self.reader.line_num

        return result


async def main():
    async with AIOFile('sample.csv', 'rb') as afp:
        async for item in AsyncDictReader(afp):
            print(item)


asyncio.run(main())

Troubleshooting

The caio linux implementation works normal for modern linux kernel versions and file systems. So you may have problems specific for your environment. It's not a bug and might be resolved some ways:

  1. Upgrade the kernel
  2. Use compatible file systems
  3. Use threads based or pure python implementation.

The caio since version 0.7.0 contains some ways to do this.

1. In runtime use the environment variable CAIO_IMPL with possible values:

  • linux - use native linux kernels aio mechanism
  • thread - use thread based implementation written in C
  • python - use pure python implementation

2. File default_implementation located near __init__.py in caio installation path. It's useful for distros package maintainers. This file might contains comments (lines starts with # symbol) and the first line should be one of linux thread or python.

  1. You might manually manage contexts:
import asyncio

from aiofile import async_open
from caio import linux_aio_asyncio, thread_aio_asyncio


async def main():
    linux_ctx = linux_aio_asyncio.AsyncioContext()
    threads_ctx = thread_aio_asyncio.AsyncioContext()

    async with async_open("/tmp/test.txt", "w", context=linux_ctx) as afp:
        await afp.write("Hello")

    async with async_open("/tmp/test.txt", "r", context=threads_ctx) as afp:
        print(await afp.read())


asyncio.run(main())

aiofile's People

Contributors

cclauss avatar h4l avatar honglei avatar leenr avatar lemurchik avatar mosquito avatar peterdeme avatar radzak avatar setop avatar shaunc avatar sudoguy avatar yifeikong avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

aiofile's Issues

Some sort of changelog / release notes?

Just an idea/request, please close if you don't care:

As somebody who uses aiofile a lot, it would be helpful to see a changelog or release notes somewhere. Wouldn't even need to be anything detailed, just maybe a mention of methods that have changed or compatibility that has been added/dropped.

Currently, when I'm upgrading I'm scrolling through the commits and hope that I'd catch things that are important to me. Definitely doable, but I'm sure I'm missing stuff this way.

Async csv.DictReader

DictReader can take any iterator that defines __next__ but no async generator.

it would be great to create an aiofile.DictReader for that.

Here is the workaround:

import csv
import io

raw_bytes = await stream.read(10000000)
wrapped_bytes = io.BytesIO(raw_bytes)
text = io.TextIOWrapper(wrapped_bytes, encoding=encoding, errors='replace')

for i, row in enumerate(csv.DictReader(text)):
    print(row)

async_open does not create file if file does not exist in mode 'a+'

Hi, I use the async_open to write file but it does not create file if file does not exist in mode 'a+'.
I report this issue becasue the builtin open will create file if file does not exists in mode 'a+'.

The mode 'w+' will create file if file does not exist but it is not append mode.

async with async_open(p, 'w+') as af:
        print('222')
        await af.write('12345\n')
        await af.write('67890\n')

The mode 'a+' is append mode but it need file exists.

async with async_open(p, 'a+') as af:
        print('222')
        await af.write('12345\n')
        await af.write('67890\n')
Traceback (most recent call last):
  File "C:\Users\rondou.chen\workspace\system-team-tools\dump_edge_sys_logs\dump_logcat.py", line 66, in <module>
    asyncio.run(main())
  File "C:\Users\rondou.chen\AppData\Local\Programs\Python\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\rondou.chen\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
    return future.result()
  File "C:\Users\rondou.chen\workspace\system-team-tools\dump_edge_sys_logs\dump_logcat.py", line 61, in main
    await asyncio.gather(*[task2,], return_exceptions=False)
  File "C:\Users\rondou.chen\workspace\system-team-tools\dump_edge_sys_logs\dump_logcat.py", line 37, in write_log
    async with async_open(p, 'a+') as af:
  File "C:\Users\rondou.chen\workspace\system-team-tools\.venv\lib\site-packages\aiofile\utils.py", line 334, in async_open
    return TextFileWrapper(afp)
  File "C:\Users\rondou.chen\workspace\system-team-tools\.venv\lib\site-packages\aiofile\utils.py", line 257, in __init__
    super().__init__(afp)
  File "C:\Users\rondou.chen\workspace\system-team-tools\.venv\lib\site-packages\aiofile\utils.py", line 167, in __init__
    self._offset = os.stat(afp.name).st_size
FileNotFoundError: [WinError 2] No such file or directory:ใ€‚: 'C:\\Users\\rondou.chen\\workspace\\system-team-tools\\logcat_logs.txt'
(.venv) PS C:\Users\rondou.chen\workspace\system-team-tools>

AioFile should supports wave library

We are using Yandex Cloud Speech and are using lpcm codec especially.

https://cloud.yandex.ru/docs/speechkit/tts/request

There is code for the one from the example:

with wave.open(test_filepath, 'wb') as file_obj:
    file_obj.setparams((1, 2, 8000, 0, 'NONE', 'NONE'))
    file_obj.writeframes(content)

Can you manage to support wave lib?

For instance, like this:

async with AIOWaveFile("/tmp/test.wav", 'wb') as afp:
        await afp.setparams((1, 2, 8000, 0, 'NONE', 'NONE'))
        await afp.write(content)
        await afp.fsync()

Race condition in `AIOFile.open()`

AIOFile.open() does not prevent concurrent open() calls from each opening the file. As a result, the following scenario can occur:

  • An AIOFile object is opened by task A, resulting in FD (file descriptor) A
  • Meanwhile, task B calls open()
  • Task A starts an operation on the AIOFile object using FD A.
  • Task B's open() call completes, overwriting the AIOFile._file_obj created by task A's open() call, causing it to be GC'd and FD A to be closed
  • Task A's operation using FD A now fails because FD A is closed

This program reproduces this behaviour:

import asyncio
import os
import tempfile

from aiofile import AIOFile


async def use_file(label: str, af: AIOFile) -> None:
    fd = await af.open()
    if fd is None:
        print(f"{label}: file already open")
    else:
        print(f"{label}: opened file: {fd=}")
    try:
        await af.fsync()  # could be anything
        print(f"{label}: fsync() succeeded")
    except Exception as error:
        print(
            f"{label}: fsync() failed: opened fd={fd}, current AIOFile fd={af.fileno()}, {error=}"
        )


async def main():
    fd, name = tempfile.mkstemp()
    os.close(fd)
    af = AIOFile(name)
    await asyncio.gather(*(use_file(f"task {n}", af) for n in range(10)))


if __name__ == "__main__":
    asyncio.run(main())
$ python aiofile_open_bug.py
task 0: opened file: fd=8
task 1: opened file: fd=9
task 2: opened file: fd=10
task 3: opened file: fd=11
task 5: opened file: fd=13
task 7: opened file: fd=15
task 4: opened file: fd=12
task 0: fsync() failed: opened fd=8, current AIOFile fd=12, error=OSError(9, 'Bad file descriptor')
task 6: opened file: fd=14
task 8: opened file: fd=8
task 9: opened file: fd=16
task 1: fsync() failed: opened fd=9, current AIOFile fd=16, error=OSError(9, 'Bad file descriptor')
task 2: fsync() failed: opened fd=10, current AIOFile fd=16, error=OSError(9, 'Bad file descriptor')
task 3: fsync() failed: opened fd=11, current AIOFile fd=16, error=OSError(9, 'Bad file descriptor')
task 5: fsync() failed: opened fd=13, current AIOFile fd=16, error=OSError(9, 'Bad file descriptor')
task 7: fsync() failed: opened fd=15, current AIOFile fd=16, error=OSError(9, 'Bad file descriptor')
task 4: fsync() failed: opened fd=12, current AIOFile fd=16, error=OSError(9, 'Bad file descriptor')
task 6: fsync() failed: opened fd=14, current AIOFile fd=16, error=OSError(9, 'Bad file descriptor')
task 8: fsync() failed: opened fd=8, current AIOFile fd=16, error=OSError(9, 'Bad file descriptor')
task 9: fsync() succeeded

Russian Symbols in Aiofile writer

from datetime import datetime
from aiofile import AIOFile, Writer


def add_timestamp(log_line):
    timestamp = datetime.strftime(datetime.now(), "[%d.%m.%y %H:%M]")
    return f"{timestamp} {log_line}"


async def chat_logging():
    async with AIOFile("log.txt", 'w') as afp:
        writer = Writer(afp)
        for _ in range(10):
            log_line = "Hello world\n"
            # log_line = "Hello ะผะฐะน ะฒะพั€ะปะด\n"
            print(add_timestamp(log_line))
            await writer(add_timestamp(log_line))
 
 
async def main():
    await asyncio.gather(chat_logging())

asyncio.run(main())

ะ•ัะปะธ ะฒ ะทะฐะฟะธัั‹ะฒะฐะตะผะพะน ัั‚ั€ะพะบะต ะตัั‚ัŒ ั€ัƒััะบะธะต ะฑัƒะบะฒั‹, ั‚ะพ ะฒ log.txt ัะพั…ั€ะฐะฝัะตั‚ัั ะบะฐัˆะฐ. ะ•ัะปะธ ะฒัะต ะฑัƒะบะฒั‹ ะฐะฝะณะปะธะนัะบะธะต, ั‚ะพ ัั‚ั€ะพั‡ะบะธ ัะพั…ั€ะฐะฝััŽั‚ัั ะฝะพั€ะผะฐะปัŒะฝะพ. ะ’ะพะทะผะพะถะฝะพ, ะฝะตะฟั€ะฐะฒะธะปัŒะฝะพ ะฒั‹ัั‡ะธั‚ั‹ะฒะฐะตั‚ัั ัะผะตั‰ะตะฝะธะต ะฒ ั„ะฐะนะปะต.
OS Linux Debian 8 / Python 3.7.4 / aiofile==1.5.2

Function not implemented

Description

Can't use in Docker image (FROM tiangolo/uvicorn-gunicorn-fastapi:python3.8-slim)

Traceback

     async with async_open(f"{folder}/{filename}", "w+") as afp:
 /usr/local/lib/python3.8/site-packages/aiofile/utils.py:327: in async_open
     afp = AIOFile(str(file_specifier), mode, *args, **kwargs)
 /usr/local/lib/python3.8/site-packages/aiofile/aio.py:127: in __init__
     self.__context = context or get_default_context()
 /usr/local/lib/python3.8/site-packages/aiofile/aio.py:316: in get_default_context
     return create_context()
 /usr/local/lib/python3.8/site-packages/aiofile/aio.py:298: in create_context
     context = caio.AsyncioContext(max_requests, loop=loop)
 /usr/local/lib/python3.8/site-packages/caio/asyncio_base.py:22: in __init__
     self.context = self._create_context(max_requests, **kwargs)
 /usr/local/lib/python3.8/site-packages/caio/linux_aio_asyncio.py:10: in _create_context
     context = super()._create_context(max_requests)
 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
 
 self = <caio.linux_aio_asyncio.AsyncioContext object at 0x4035c58a30>
 max_requests = 512, kwargs = {}
 
     def _create_context(self, max_requests, **kwargs):
 >       return self.CONTEXT_CLASS(max_requests=max_requests, **kwargs)
 E       SystemError: (38, 'Function not implemented')
 
 /usr/local/lib/python3.8/site-packages/caio/asyncio_base.py:25: SystemError

Versions

python==3.8
aiofile==3.7.2
caio==0.9.3

Can I use aiofile.async_open without a with statement?

Python's open may be used directly, or as a context manager in a with block.
I love contextmanagers, but they're not always the right tool because cleanup on the stack is not always the right time to perform cleanup.
Can I use aiofile.async_open without a with statement?

import asyncio

import aiofile

async def test():
    f = aiofile.async_open('filename.txt', mode='w')
    try:
        await f.write('123') #  asyncio.base_futures.InvalidStateError: AIOFile closed
    finally:
        pass
        # await f.close()
asyncio.run(test())

Missing posix_aio.c

Installing via the git repository fails due to the above missing file. AFAICT it's not in the repository.

I just realized 0.5.2 is on pypi but it would be nice to have this as a backup.

Release a new version on Pip

Since most user perfer pip to install package, pls update the version used in pip.
Like the following problem already solved but remained in pip version:

#35

Segmentation fault

I've prepared the following code to represent the exception:

import asyncio
import os
import sys

import aiofile

fifo = '/tmp/test.fifo'
if not os.path.exists(fifo):
    os.mkfifo(fifo)


async def read():
    print('Reading')
    async with aiofile.AIOFile(fifo, 'r') as afp:
        while True:
            data = await afp.read(4096)
            if not data:
                break
    print('Reading done')


async def write():
    print('Writing')
    async with aiofile.AIOFile(fifo, 'w') as afp:
        await afp.write('test')
        await afp.fsync()
    print('Writing done')


async def main():
    arg = sys.argv[1]
    if arg == 'reader':
        await read()
    elif arg == 'writer':
        await write()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Execution:

$ python test.py reader &
[1] 30822
$ Reading

$ python test.py writer
Writing
Segmentation fault
$ Reading done

Is it a bug or I'm doing something wrong?

Add support for StringIO and BytesIO

Hello,
Thanks for the great package!

I want to make a feature request for adding support for aiofile's equivalent of StringIO and BytesIO from the standard python lib. These will be particularly useful when writing test, as test doubles. Currently, the only way to test functions that use aiofile is to create temporary files and wrap them in AIOFile.

Tag the source

It would be very helpful if you could tag releases as well again. This would enable distributions to fetch the package from GitHub instead of PyPI.

Thanks

aio is not real asynchronous file IO

aio โ€” POSIX asynchronous IO, which is used by this package aiofile โ€” is not real asynchronous file IO. You could find these sentences in manpage (man 7 aio):

The current Linux POSIX AIO implementation is provided in user space by glibc. This has a number of limitations, most notably that maintaining multiple threads to perform I/O operations is expensive and scales poorly.

Itโ€™s just based on userโ€space threads, which is not different from simply using ThreadPool to read file in Python.

SystemError: (22, 'Invalid argument') occurs when reading/writing on Raspberry PI OS

I get the following error when running the async_open example.

Traceback (most recent call last):
  File "/home/pi/.local/lib/python3.7/site-packages/caio/asyncio_base.py", line 43, in step
    self.context.submit(*operations.keys())
SystemError: (22, 'Invalid argument')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "async_open_test.py", line 18, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "async_open_test.py", line 7, in main
    await afp.write("Hello ")
  File "/home/pi/.local/lib/python3.7/site-packages/aiofile/utils.py", line 284, in write
    await operation
  File "/home/pi/.local/lib/python3.7/site-packages/aiofile/aio.py", line 243, in write_bytes
    data[written:], self.fileno(), offset + written,
  File "/home/pi/.local/lib/python3.7/site-packages/caio/asyncio_base.py", line 87, in submit
    await future
  File "/home/pi/.local/lib/python3.7/site-packages/caio/asyncio_base.py", line 48, in step
    self.context.submit(operation)
SystemError: (22, 'Invalid argument')

Versions:

  • aiofile==3.3.3
  • caio==0.6.3
  • Python 3.7.3
  • libaio1 0.3.112-3 (wasn't installed initially)
  • Linux raspberrypi 5.4.79-v7l+ #1373 SMP Mon Nov 23 13:27:40 GMT 2020 armv7l GNU/Linux

Am I missing something?

When running on AWS Lambda, sometimes I get SystemError: (11, 'Resource temporarily unavailable')

Not sure if its an AWS Environment problem or aiofile problem. I'm running python 3.8, aiofile 3.3.3, asyncio + uvloop 0.14.0 cron job that runs every minute and opens a yaml file in the same package.

Sometimes I get this traceback:

  File "/var/task/util/track_errors.py", line 76, in wrapper
    return func(*args, **kwargs)
  File "/var/task/cron_jobs/process_notification_webhook.py", line 64, in handler
    return asyncio.run(main(event))
  File "/var/lang/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
    return future.result()
  File "/var/task/cron_jobs/process_notification_webhook.py", line 12, in main
    async with AsyncDynamodbTable(
  File "/var/task/fluxo_aws/async_dynamodb_table.py", line 54, in __aenter__
    async with async_open(self.schema_path, "r") as opened_file:
  File "/var/task/aiofile/utils.py", line 315, in async_open
    afp = AIOFile(str(file_name), mode, *args, **kwargs)
  File "/var/task/aiofile/aio.py", line 116, in __init__
    self.__context = context or get_default_context()
  File "/var/task/aiofile/aio.py", line 289, in get_default_context
    return create_context()
  File "/var/task/aiofile/aio.py", line 276, in create_context
    context = caio.AsyncioContext(max_requests, loop=loop)
  File "/var/task/caio/asyncio_base.py", line 23, in __init__
    self.context = self._create_context(
  File "/var/task/caio/linux_aio_asyncio.py", line 10, in _create_context
    context = super()._create_context(max_requests)
  File "/var/task/caio/asyncio_base.py", line 62, in _create_context
    return self.CONTEXT_CLASS(max_requests=max_requests, **kwargs)
SystemError: (11, 'Resource temporarily unavailable')

The method that opens the file into /var/task/fluxo_aws/async_dynamodb_table.py is:

    async def __aenter__(self):
        self.client = await aioboto3.client("dynamodb").__aenter__()
        self.resource = await aioboto3.resource("dynamodb").__aenter__()
        self.table = await self.resource.Table(self.table_name)
        if self.schema_path:
            async with async_open(self.schema_path, "r") as opened_file:
                file_content = await opened_file.read(length=-1)
                self.schema = yaml.safe_load(file_content)
                self._build_validator()

        return self

This is an open source helper for AWS that I made for my projects, that you can find here: https://github.com/Fluxo-Resultados/fluxo-aws/blob/master/fluxo_aws/async_dynamodb_table.py

I suspect thats an AWS environment problem, as I get this error 2 to 5 times in a row and after that it runs fine for some minutes (or even hours) before getting the error again.

I don't know how to proceed to help debugging this and help to fix the issue. Thanks!

Wheel for Python 3.8

Hi,

Could you please add support for Python 3.8? Including the wheel?

Thanks in advance,
Samuel

Attribute error if file does not exist

Source (opening a nonexistent file):

import aiofile
import asyncio

loop = asyncio.get_event_loop()


async def main():
    try:
        async with aiofile.AIOFile('x.txt', 'r', loop):
            pass
    except:
        pass


loop.run_until_complete(main())

I'd expect this to close normally with no error, but I get this:

Exception ignored in: <bound method AIOFile.__del__ of <AIOFile: 'x.txt'>>
Traceback (most recent call last):
  File ".../lib/python3.6/site-packages/aiofile/aio.py", line 74, in __del__
    self.close()
  File ".../lib/python3.6/site-packages/aiofile/aio.py", line 58, in close
    if self.__fileno == -2:
AttributeError: _AIOFile__fileno

UNIX - SystemError: (9, 'Bad File Descriptor') on writing to file by line

I have a script that is working on my Windows 10 Pro machine, that is not working on my AWS Environment. Am I missing something simple?

MS Windows Version 2004 (OS Build 19041.685)
Python 3.8.0

UNIX Version:
uname -r: 4.14.214-160.339.amzn2.x86_64
NAME="Amazon Linux"
VERSION="2"
ID="amzn"
ID_LIKE="centos rhel fedora"
VERSION_ID="2"
PRETTY_NAME="Amazon Linux 2"
CPE_NAME="cpe:2.3:o:amazon:amazon_linux:2"
Python 3.8.0 (Manually compiled, same issue on the stock python 3.7.9 that I could install with yum)

Requirements file:

aiofile==3.3.3
aiohttp==3.7.3
async-timeout==3.0.1
attrs==20.3.0
caio==0.6.3
cffi==1.14.4
chardet==3.0.4
discord.py==1.6.0
idna==3.1
multidict==5.1.0
pycparser==2.20
PyNaCl==1.4.0
python-dotenv==0.15.0
six==1.15.0
typing-extensions==3.7.4.3
yarl==1.6.3

The following code throws the error:

async def save_8ball(guild_id):
    async with aiofile.async_open(os.path.join(CONFIG['8Ball']['FileDirectory'], f'{guild_id}.txt'), 'w')\
            as create_8ball:
        for resp in EIGHT_BALL_RESPONSES[guild_id]:
            await create_8ball.write(f'{resp}\r\n')

I am using a relative path for the os.path.join, as the config file is currently pointing to "./8ball"

Error Listing:

Traceback (most recent call last):
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/caio/asyncio_base.py", line 43, in step
    self.context.submit(*operations.keys())
SystemError: (9, 'Bad file descriptor')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/discord/ext/commands/core.py", line 85, in wrapped
    ret = await coro(*args, **kwargs)
  File "./botGames.py", line 96, in prediction
    await create_guild_8ball(guild_id=ctx.guild.id)
  File "./botGames.py", line 66, in create_guild_8ball
    await save_8ball(guild_id=guild_id)
  File "./botGames.py", line 34, in save_8ball
    await create_8ball.write(f'{resp}\r\n')
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/aiofile/utils.py", line 183, in __aexit__
    await self.close()
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/aiofile/utils.py", line 176, in close
    await self.file.close()
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/aiofile/aio.py", line 166, in close
    await self.fsync()
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/aiofile/aio.py", line 261, in fsync
    return await self.__context.fdsync(self.fileno())
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/caio/asyncio_base.py", line 87, in submit
    await future
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/caio/asyncio_base.py", line 48, in step
    self.context.submit(operation)
SystemError: (9, 'Bad file descriptor')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/discord/ext/commands/bot.py", line 902, in invoke
    await ctx.command.invoke(ctx)
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/discord/ext/commands/core.py", line 1345, in invoke
    await super().invoke(ctx)
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/discord/ext/commands/core.py", line 864, in invoke
    await injected(*ctx.args, **ctx.kwargs)
  File "/home/ec2-user/python_discord_bot/lib/python3.8/site-packages/discord/ext/commands/core.py", line 94, in wrapped
    raise CommandInvokeError(exc) from exc
discord.ext.commands.errors.CommandInvokeError: Command raised an exception: SystemError: (9, 'Bad file descriptor')

When substituting the aiofile.async_open for a regular open this succeeds without issue on the Linux machine.
Reading online this error usually means that the file was closed and then attempted to be referenced, but I'm writing into the file, and then closing the file.

I've also tried to write into this using AIOFile with a writer instead of async_open, but I'm receiving the same result.

Infinite reading from a file

ICRAR/ijson#38

import asyncio

from aiofile import AIOFile

data = b'hi'

async def main():
    with open("test.data", "wb") as fp:
        fp.write(data)
    async with AIOFile("test.data", "rb") as fp:
        while True:
            buf = await fp.read(2)
            if not buf:
                break
            print(buf)

asyncio.run(main())

Manual context management example on readme has a bug

The following code as listed on the readme page to manage context manually, has a bug.

import asyncio

from aiofile import async_open
from caio import linux_aio, thread_aio


async def main():
    linux_ctx = linux_aio.Context()
    threads_ctx = thread_aio.Context()

    async with async_open("/tmp/test.txt", "a", context=linux_ctx) as afp:
        await afp.write("Hello")

    async with async_open("/tmp/test.txt", "a", context=threads_ctx) as afp:
        print(await afp.read())


await main()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Input In [94], in <cell line: 18>()
     14     async with async_open("/tmp/test.txt", "a", context=threads_ctx) as afp:
     15         print(await afp.read())
---> 18 await main()

Input In [94], in main()
      8 linux_ctx = linux_aio.Context()
      9 threads_ctx = thread_aio.Context()
---> 11 async with async_open("/tmp/test.txt", "a", context=linux_ctx) as afp:
     12     await afp.write("Hello")
     14 async with async_open("/tmp/test.txt", "a", context=threads_ctx) as afp:

File /usr/local/lib/python3.10/site-packages/aiofile/utils.py:200, in FileIOWrapperBase.__aenter__(self)
    199 async def __aenter__(self) -> "FileIOWrapperBase":
--> 200     await self.file.open()
    201     return self

File /usr/local/lib/python3.10/site-packages/aiofile/aio.py:177, in AIOFile.open(self)
    174     return None
    176 if self.__open_result is None:
--> 177     self.__open_result = self._run_in_thread(
    178         open,
    179         self._fname,
    180         self._open_mode,
    181     )
    182     self._file_obj = await self.__open_result
    183     self.__open_result = None

File /usr/local/lib/python3.10/site-packages/aiofile/aio.py:154, in AIOFile._run_in_thread(self, func, *args, **kwargs)
    151 def _run_in_thread(
    152         self, func: "Callable[..., _T]", *args: Any, **kwargs: Any
    153 ) -> "asyncio.Future[_T]":
--> 154     return self.__context.loop.run_in_executor(
    155         self._executor, partial(func, *args, **kwargs),
    156     )

AttributeError: 'Context' object has no attribute 'loop'

AIOFile context manager looses data

When AIOFile object is used as a context manager then some data is lost. Below is slightly modified example from README.rst:

import asyncio
from aiofile import AIOFile, LineReader, async_open


async def main():

    print("\nAIOFile:")
    async with AIOFile("/tmp/hello.txt", 'w') as afp:

        for i in range(10):
            await afp.write("%d Hello World\n" % i)

        await afp.write("Tail-less string\n")

    async with AIOFile("/tmp/hello.txt", 'r') as afp:
        async for line in LineReader(afp):
            print(line[:-1])

    print("\nasync_open:")
    async with async_open("/tmp/hello.txt", 'w') as afp:

        for i in range(10):
            await afp.write("%d Hello World\n" % i)

        await afp.write("Tail-less string\n")

    async with AIOFile("/tmp/hello.txt", 'r') as afp:
        async for line in LineReader(afp):
            print(line[:-1])


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

The output is:

Python 3.8.16 (default, Dec 10 2022, 12:00:00) 
[GCC 7.5.0] on localhost
>>> 
AIOFile:
Tail-less string

async_open:
0 Hello World
1 Hello World
2 Hello World
3 Hello World
4 Hello World
5 Hello World
6 Hello World
7 Hello World
8 Hello World
9 Hello World
Tail-less string
>>> 

So it seems that async_open worked fine while AIOFile context manager lost what was written in the for loop.

LineReader issue on Windows

When reading a file in windows the last line is not read correctly. Here is the example code:

async def read_text():
    path = os.getcwd()
    myfile = os.path.join(path, 'dummy.txt')
    a = 1
    async with AIOFile(myfile, 'r') as afp:
        async for line in LineReader(afp):
            print(a, ': ', line[:-1])
            a += 1

Where the dummy.txt file is this:

This is line 1
This is line 2
This is line 3
This is line 4
This is line 5
This is line 6
This is line 7
This is line 8

including the extra line at the end. The output is:

1 :  This is line 1
2 :  This is line 2
3 :  This is line 3
4 :  This is line 4
5 :  This is line 5
6 :  This is line 6
7 :  This is line 7
8 :  This is line 8
9 :  line 8
10 :

If the extra line is removed, the output is this:

1 :  This is line 1
2 :  This is line 2
3 :  This is line 3
4 :  This is line 4
5 :  This is line 5
6 :  This is line 6
7 :  This is line 7
8 :  This is line 8 line

Question: Load file in parallel

Ist it possible to load from /tmp/huge.file:
10 bytes from offset 100
10 bytes from offset 200
10 bytes from offset 300
in parallel?

async_open doesn't fully mimic the behavior of Python file objects

The object that is produced by async_open has a seek() method. However, it only accepts two positional arguments(one being the object itself, and the other being the offset), while Python files accept three positional arguments (including the object itself), the third being the position, from which the offset has to be applied (0 - beginning, 1 - current position, 2 - end of file). This leads to some conflicts between this module and other modules that rely on the default Python file interface. Will this ever be changed?

Confuse issue with uvloop

Can some one explain?
I try write to file, but i get this traceback, and
the main thread freezes and nothing happens.
I use:

  • Ubuntu 18.04 LTS
  • python 3.6.1
  • uvloop~=0.14
  • aiofile==1.5.2
    Traceback (most recent call last): File "aiofile/posix_aio.pyx", line 179, in aiofile.posix_aio.AIOOperation._set_result File "aiofile/posix_aio.pyx", line 180, in aiofile.posix_aio.AIOOperation._set_result File "uvloop/loop.pyx", line 1265, in uvloop.loop.Loop.call_soon_threadsafe File "uvloop/loop.pyx", line 634, in uvloop.loop.Loop._call_soon File "uvloop/cbhandles.pyx", line 341, in uvloop.loop.new_Handle File "uvloop/cbhandles.pyx", line 15, in uvloop.loop.Handle._set_loop File "uvloop/cbhandles.pyx", line 428, in uvloop.loop.extract_stack ValueError: call stack is not deep enough Exception ignored in: 'aiofile.posix_aio.on_event' Traceback (most recent call last): File "aiofile/posix_aio.pyx", line 179, in aiofile.posix_aio.AIOOperation._set_result File "aiofile/posix_aio.pyx", line 180, in aiofile.posix_aio.AIOOperation._set_result File "uvloop/loop.pyx", line 1265, in uvloop.loop.Loop.call_soon_threadsafe File "uvloop/loop.pyx", line 634, in uvloop.loop.Loop._call_soon File "uvloop/cbhandles.pyx", line 341, in uvloop.loop.new_Handle File "uvloop/cbhandles.pyx", line 15, in uvloop.loop.Handle._set_loop File "uvloop/cbhandles.pyx", line 428, in uvloop.loop.extract_stack ValueError: call stack is not deep enough

Examples for writing large files in append mode

Hello,

Thank you very much for your hard work on this. I am testing things with aiofile to see if i can write a large file (might go up to 400-500mb) from a continuous data stream in append mode. I can't seem to find an example of writing a large file in append mode using aiofile in pypi or readme. Can you please point me to an example?

AIOfile 3.0.0. raises (22, 'Invalid argument') if used on Ubuntu 18.4

AIOfile 1.5.1 worked fine with Ubuntu18.04, but AIOfile 3.0.0 raises an issue (22, 'Invalid argument') on Ubuntu18.04
AIOfile 3.0.0 works fine on Ubuntu20.04

test script:

root@ubuntu-s-1vcpu-1gb-fra1-01:~# cat aiofile_test.py 
import asyncio
from aiofile import AIOFile


async def main():
    async with AIOFile("/tmp/hello.txt", 'w+') as afp:
        await afp.write("Hello ")
        await afp.write("world", offset=7)
        await afp.fsync()

        print(await afp.read())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

output:

root@ubuntu-s-1vcpu-1gb-fra1-01:~# pip3 install aiofile==3.0.0
Collecting aiofile==3.0.0
  Using cached https://files.pythonhosted.org/packages/f9/e5/a3215b7da4fb90c3abb7850ae1f3fa4c7553ab33381c7f49aaccc8adc531/aiofile-3.0.0-py3-none-any.whl
Requirement already satisfied: caio~=0.5.3 in /usr/local/lib/python3.6/dist-packages (from aiofile==3.0.0)
Installing collected packages: aiofile
  Found existing installation: aiofile 1.5.1
    Uninstalling aiofile-1.5.1:
      Successfully uninstalled aiofile-1.5.1
Successfully installed aiofile-3.0.0

root@ubuntu-s-1vcpu-1gb-fra1-01:~# python3 aiofile_test.py 
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/caio/asyncio_base.py", line 43, in step
    self.context.submit(*operations.keys())
SystemError: (22, 'Invalid argument')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "aiofile_test.py", line 15, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "aiofile_test.py", line 11, in main
    print(await afp.read())
  File "/usr/local/lib/python3.6/dist-packages/aiofile/aio.py", line 163, in close
    await self.fsync()
  File "/usr/local/lib/python3.6/dist-packages/aiofile/aio.py", line 223, in fsync
    return await self.__context.fdsync(self.fileno())
  File "/usr/local/lib/python3.6/dist-packages/caio/asyncio_base.py", line 87, in submit
    await future
  File "/usr/local/lib/python3.6/dist-packages/caio/asyncio_base.py", line 48, in step
    self.context.submit(operation)
SystemError: (22, 'Invalid argument')

works fine with ubuntu20.04

regards Stefan

aiofile 3.1.1 pipe example not working under Debian stable

I'm running the aiofile 3.1.1 pipe example under Linux (Debian stable, python 3.7). It usually stops with no other output than this:

201021 14:17 ~ srn@lije{1}% python3 /tmp/z.py
Start reader
Start reader
Start writer
C-c C-cExited
Traceback (most recent call last):
File "/tmp/z.py", line 46, in
loop.run_until_complete(main())
File "/usr/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
self.run_forever()
File "/usr/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
self._run_once()
File "/usr/lib/python3.7/asyncio/base_events.py", line 1775, in _run_once
handle._run()
File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "/usr/local/lib/python3.7/dist-packages/caio/asyncio_base.py", line 55, in _run
step(operation, future)
File "/usr/local/lib/python3.7/dist-packages/caio/asyncio_base.py", line 43, in step
self.context.submit(*operations.keys())
KeyboardInterrupt
201021 14:18 ~ srn@lije{1}%

(Since nothing is output, I give up and press control-C.)

I have made this test several times, trying to use pdb and with other minor changes, and one of the tests yielded a few loop.time() outputs, like this:

Start reader
Start reader
Start writer
1131834.8124811131834.8126551131834.812847
1131834.813134
C-c C-c C-c C-cTraceback (most recent call last):
File "/usr/local/ch-tools3/z.py", line 64, in
loop.run_until_complete(main())
File "/usr/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
self.run_forever()
File "/usr/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
self._run_once()
File "/usr/lib/python3.7/asyncio/base_events.py", line 1775, in _run_once
handle._run()
File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "/usr/local/lib/python3.7/dist-packages/caio-0.6.3-py3.7-linux-x86_64.egg/caio/asyncio_base.py", line 55, in _run
step(operation, future)
File "/usr/local/lib/python3.7/dist-packages/caio-0.6.3-py3.7-linux-x86_64.egg/caio/asyncio_base.py", line 43, in step
self.context.submit(*operations.keys())

Here's /tmp/z.py, which I believe is a verbatim copy of the example from the README:

import os
import asyncio
from aiofile import AIOFile, Reader, Writer

async def reader(fname):
print('Start reader')
async with AIOFile(fname, 'r') as afp:
while True:
# Maximum expected chunk size, must be passed.
# Otherwise will be read zero bytes
# (because unix pipe has zero size)
data = await afp.read(4096)
print(data)

async def writer(fname):
print('Start writer')
async with AIOFile(fname, 'w') as afp:
while True:
await asyncio.sleep(1)
await afp.write('%06f' % loop.time())

async def main():
fifo_name = "/tmp/test.fifo"

if os.path.exists(fifo_name):
    os.remove(fifo_name)

os.mkfifo(fifo_name)

# Starting two readers and one writer, but only one reader
# will be reading at the same time.
await asyncio.gather(
    reader(fifo_name),
    reader(fifo_name),
    writer(fifo_name),
)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
loop.run_until_complete(main())
finally:
# Shutting down and closing file descriptors after interrupt
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
print('Exited')


201021 14:18 ~ srn@lije{1}% uname -a
Linux lije 4.19.0-11-amd64 #1 SMP Debian 4.19.146-1 (2020-09-17) x86_64 GNU/Linux

os.pwrite() on windows !!

Hi
today AIOFile stop working and throw this exception (install from pip on venv)

ERROR 05:07:03 __eml:162 fetch module 'os' has no attribute 'pwrite'
Traceback (most recent call last):
File "c:\xxx\pyimap\v.1.2\export__eml.py", line 150, in fetch
await afp.write(_eml)
File "C:\xxx\pyimap\v.1.2\venv\lib\site-packages\aiofile\aio.py", line 219, in write
return await self.__context.write(bytes_data, self.__fileno, offset)
File "C:\xxx\pyimap\v.1.2\venv\lib\site-packages\caio\asyncio_base.py", line 88, in submit
return op.get_value()
File "C:\xxx\pyimap\v.1.2\venv\lib\site-packages\caio\python_aio.py", line 188, in get_value
raise self.exception
File "C:\xxx\pyimap\v.1.2\venv\lib\site-packages\caio\python_aio.py", line 71, in _in_thread
result = op_mapoperation.opcode
File "C:\xxx\pyimap\v.1.2\venv\lib\site-packages\caio\python_aio.py", line 39, in _handle_write
return os.pwrite(
AttributeError: module 'os' has no attribute 'pwrite'

aiofile LineReader does a read for every line in spite of having multiple lines in CHUNK_SIZE

Long story short

LineReader is very slow

Expected behavior

LineReader is as fast as normal line reading.
Read Chunk size actually prevents extra reads.

Actual behavior

LineReader is slow, takes many ms per line.
LineReader causes a read for each line.

Steps to reproduce

import asyncio
import functools
import time

import aiofile


def print_on_call_decorator(func):
    @functools.wraps(func)
    def wrapper_decorator(*args, **kwargs):
        print("real read called")
        value = func(*args, **kwargs)
        return value

    return wrapper_decorator


aiofile.AIOFile.read_bytes = print_on_call_decorator(aiofile.AIOFile.read_bytes)


async def main():
    async with aiofile.AIOFile("test_line_iter_file", "r") as f:
        last_line_time = time.perf_counter()
        async for line in aiofile.LineReader(f, chunk_size=aiofile.LineReader.CHUNK_SIZE * 16*16):
            # print("line_time", time.perf_counter() - last_line_time)
            last_line_time = time.perf_counter()
            # print(line, end="")


if __name__ == "__main__":
    open("test_line_iter_file", "w").write("\n".join(str(i) for i in range(1000000)))
    asyncio.run(main())

Additional info

Sync version to compare:

import time

open("test_line_iter_file", "w").write("\n".join(str(i) for i in range(100000)))
start = time.perf_counter()
with open("test_line_iter_file", "r", buffering=4192 * 16) as f:
    last_line_time = time.perf_counter()
    for line in f:
        # print("line_time", time.perf_counter() - last_line_time)
        last_line_time = time.perf_counter()
        # print(line, end="")

print("end_time", time.perf_counter() - start)

My temporary solution that only works for python approved new lines from the file __iter__, its only twice as slow as sync version:

import asyncio
import asyncio
import collections.abc
import functools
import io
import itertools
import time
from typing import Union, Self

import aiofile


class CustomLineReader(collections.abc.AsyncIterable):
    CHUNK_SIZE = 4192

    def __init__(
        self,
        aio_file: aiofile.AIOFile,
        offset: int = 0,
        chunk_size: int = CHUNK_SIZE,
        line_sep: str = "\n",
    ):
        self.__reader = aiofile.Reader(aio_file, chunk_size=chunk_size, offset=offset)

        self._buffer = None

        self.linesep = aio_file.encode_bytes(line_sep) if aio_file.mode.binary else line_sep

        self.chunk_iterator = None
        self.last_read = None

    async def setup_buffer(self, buffer_initialization=None):
        chunk = await self.__reader.read_chunk()
        if not chunk:
            raise StopAsyncIteration(chunk)

        if self._buffer:
            self._buffer.close()
            del self._buffer
        self._buffer = io.BytesIO() if self.__reader.file.mode.binary else io.StringIO()
        if buffer_initialization:
            self._buffer.write(buffer_initialization)

        self._buffer.write(chunk)
        self._buffer.seek(0)

        self.chunk_iterator = self._buffer.__iter__()

    async def __anext__(self) -> Union[bytes, str]:
        if not self._buffer:
            await self.setup_buffer()
        try:
            self.last_read = next(self.chunk_iterator)
            if self.last_read[-1] != "\n":
                await self.setup_buffer(self.last_read)
                self.last_read = next(self.chunk_iterator)
        except StopIteration:
            await self.setup_buffer(self.last_read)
            self.last_read = next(self.chunk_iterator)
        return self.last_read


    def __aiter__(self) -> Self:
        return self

Reading line by line

I have this csv file

"year","name","percent","sex"
1880,"John",0.081541,"boy"
1880,"William",0.080511,"boy"
1880,"James",0.050057,"boy"
1880,"Charles",0.045167,"boy"
1880,"George",0.043292,"boy"

and i want to read the second column.

I have this code

import asyncio
from aiofile import AIOFile, LineReader, Writer
async def main():

    async with AIOFile("file.csv", 'r') as afp:
        async for line in LineReader(afp):
            print(line[:10])


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

but this only reads the first 10 characters. How can i read the second column value of the simple csv attached here?.

Performance check

I have tried to check performance of the library and found some confusing results. I am on Ubuntu 16.04 Python 3.7. This is the snippet I use to check performance: http://dpaste.com/3SH4BAZ
I check by toggling comment of lines 31 and 32.
When I run the sequential version the code is always faster (did a huge number of tries). In seconds, it is near this number: 0.0032964419997369987
When I run the async version the code is slower and near a value of 0.04831875699983357
Again, I run the tests a huge amount of times, to safely conclude that the sequential code is about 10 times faster than the asyncio one.
Am I using aiofile wrong? Do I miss something in the asyncio utilization?
Thanks in advance

Missing binary flag leads to short reads?

how to reproduce:

async with aiofile.AIOFile("10_mb_file", "rb") as f:
    async for c in aiofile.Reader(f, chunk_size=1024*1024):
        print(len(c))

expected: prints 10x 1048576โ€ฌ
actual: prints 1x 587
system: win10, python 3.8.1 x64, aiofile pip 1.5.2

adding flags |= os.O_BINARY in aio.py after line 96 fixes the issue for me.
Not sure if this is an oversight or if I am misunderstanding something here.

aiofile writing binary file .wav on windows issue

Hello, I am using it to writing an wav file from iternet async with aiohttp. It works fine on linux, but downloaded audio sounds weird on windows.

    async def download_audio(self, task_line: TaskLine):
        url = f'{self.base_url}{task_line.raw_data_file_path}'
        logger.debug(url)
        async with aiohttp.ClientSession(cookie_jar=self._cookie_jar) as session:
            async with session.get(url=url) as resp:
                assert resp.status == 200, f'request error status code: {resp.status}'
                path = task_line.audio_file_path()
                dir, _ = os.path.split(path)
                if not os.path.exists(dir):
                    os.mkdir(dir)
                async with AIOFile(path, 'wb') as f:
                    await f.write(await resp.read())

When I change write to file code with internal open method, the audio file download turn out be fine.

    async def download_audio(self, task_line: TaskLine):
        url = f'{self.base_url}{task_line.raw_data_file_path}'
        logger.debug(url)
        async with aiohttp.ClientSession(cookie_jar=self._cookie_jar) as session:
            async with session.get(url=url) as resp:
                assert resp.status == 200, f'request error status code: {resp.status}'
                path = task_line.audio_file_path()
                dir, _ = os.path.split(path)
                if not os.path.exists(dir):
                    os.mkdir(dir)
                # async with AIOFile(path, 'wb') as f:
                #     await f.write(await resp.read())
                with open(path, 'wb') as f:
                    f.write(await resp.read())

Downloaded audio file info:
AIOFile:
AIOFile
open:
open

By the way, I am using manjaro18 linux system and windows10 ltsb2016 and both python3 latest version 3.7.3

aiofile        1.5.2
aiohttp        3.5.4

Can't install on Amazon Linux

I get the following error.

gcc -pthread -Wno-unused-result -Wsign-compare -DNDEBUG -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -D_GNU_SOURCE -fPIC -fwrapv -fPIC -I/usr/include/python3.7m -c caio/thread_aio.c -o build/temp.linux-x86_64-3.7/caio/thread_aio.o -g caio/thread_aio.c:6:10: fatal error: Python.h: No such file or directory #include <Python.h> ^~~~~~~~~~ compilation terminated. error: command 'gcc' failed with exit status 1

"@coroutine" decorator is deprecated since Python 3.8

When I test my program using aiofiles==0.5.0 (latest available version) with pytest in Python 3.8.0, I get the following warning:

[...]/lib/python3.8/site-packages/aiofiles/os.py:10: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
    def run(*args, loop=None, executor=None, **kwargs):

-- Docs: https://docs.pytest.org/en/latest/warnings.html

This is similar to e.g. this issue, where also this information is probably helpful for this repo:

Python 3.5 Documentation:

The async def type of coroutine was added in Python 3.5, and is recommended if there is no need to support older Python versions.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.