"""
This module provides an easy interface for asynchronous file access
Reading and writing to files in Python is typically a blocking operation. Using
`q2_open` instead of the built-in `open` provides the ability to do file operations
without blocking the main even loop by wrapping each file method in a thread based
executor.
There are minor differences between `open` and `q2_open`.
1) `q2_open` must be used as an asynchronous context manager (the `async with`
statement). This ensures that the file gets closed after exiting the `async with` block.
2) The resulting file handle provided by `q2_open` has the same methods as the file
handle from `open` but they are all called with `await`. Using the file handle from
`q2_open` as an iterator to loop over each line requires an `async for` loop rather than
the normal `for` loop.
"""
import asyncio
from inspect import stack
from collections.abc import AsyncGenerator, Iterable, AsyncIterator
from contextlib import asynccontextmanager
from enum import IntEnum
from io import SEEK_SET, SEEK_END, SEEK_CUR, TextIOWrapper
from os import PathLike
from pathlib import Path
from typing import Literal, AnyStr, Any
type FileMode = Literal[
"r+",
"+r",
"rt+",
"r+t",
"+rt",
"tr+",
"t+r",
"+tr",
"w+",
"+w",
"wt+",
"w+t",
"+wt",
"tw+",
"t+w",
"+tw",
"a+",
"+a",
"at+",
"a+t",
"+at",
"ta+",
"t+a",
"+ta",
"x+",
"+x",
"xt+",
"x+t",
"+xt",
"tx+",
"t+x",
"+tx",
"w",
"wt",
"tw",
"a",
"at",
"ta",
"x",
"xt",
"tx",
"r",
"rt",
"tr",
"U",
"rU",
"Ur",
"rtU",
"rUt",
"Urt",
"trU",
"tUr",
"Utr",
]
[docs]
class Whence(IntEnum):
SET = SEEK_SET
CUR = SEEK_CUR
END = SEEK_END
async def _thread_execute(func, *args):
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
class AsyncFile:
def __init__(self, path: PathLike | str, file_obj: TextIOWrapper):
stacks = stack()
if stacks[1].function != "q2_open":
raise ValueError("You must call the function 'q2_open' to use AsyncFile")
if isinstance(path, str):
self.path = Path(path)
else:
self.path = path
self._fd = file_obj
@property
def name(self) -> str:
return self._fd.name
@property
def buffer(self) -> object:
return self._fd.buffer
@property
def closed(self) -> bool:
return self._fd.closed
@property
def line_buffering(self) -> bool:
return self._fd.line_buffering
@property
def mode(self) -> str:
return self._fd.mode
@property
def write_through(self) -> bool:
return self._fd.write_through
@property
def newlines(self) -> Any:
return self._fd.newlines
@property
def encoding(self) -> str:
return self._fd.encoding
async def __aiter__(self) -> AsyncIterator[AnyStr]:
while True:
line = await self.readline()
if line:
yield line
else:
break
def readable(self) -> bool:
return self._fd.readable()
def writable(self) -> bool:
return self._fd.writable()
def fileno(self) -> int:
return self._fd.fileno()
async def read(self, size: int = -1) -> str:
return await _thread_execute(self._fd.read, size)
async def readline(self, size: int = -1) -> str:
return await _thread_execute(self._fd.readline, size)
async def readlines(self, hint: int = -1) -> list[str]:
return await _thread_execute(self._fd.readlines, hint)
async def write(self, data: AnyStr) -> int:
return await _thread_execute(self._fd.write, data)
async def writelines(self, data: Iterable[AnyStr]) -> None:
return await _thread_execute(self._fd.writelines, data)
async def truncate(self) -> int:
return await _thread_execute(self._fd.truncate)
async def seek(self, offset: int, whence: Whence | Literal[0, 1, 2] = 0) -> int:
return await _thread_execute(self._fd.seek, offset, whence)
async def tell(self) -> int:
return await _thread_execute(self._fd.tell)
async def flush(self) -> None:
await _thread_execute(self._fd.flush)
async def close(self) -> None:
await _thread_execute(self._fd.close)
[docs]
@asynccontextmanager
async def q2_open(
path: PathLike | str, mode: FileMode = "r"
) -> AsyncGenerator[AsyncFile, None]:
"""
Analogous to `open()` but provides non-blocking async operations.
Usage:
.. code-block:: python
async def example_func(path: Path) -> str:
contents = ""
async with q2_open(path) as file_handle:
async for line in file_handle:
if line.startswith("#"):
contents = line
break
return contents
"""
file_obj = await _thread_execute(open, path, mode)
file = AsyncFile(path, file_obj)
yield file
await file.close()