Source code for q2_sdk.core.q2_open

"""
This module provides an easy interface for asynchronous file access

Reading and writing to files in Python is typically a blocking operation. Using
`q2_open` instead of the built-in `open` provides the ability to do file operations
without blocking the main even loop by wrapping each file method in a thread based
executor.

There are minor differences between `open` and `q2_open`.

1) `q2_open` must be used as an asynchronous context manager (the `async with`
statement). This ensures that the file gets closed after exiting the `async with` block.

2) The resulting file handle provided by `q2_open` has the same methods as the file
handle from `open` but they are all called with `await`. Using the file handle from
`q2_open` as an iterator to loop over each line requires an `async for` loop rather than
the normal `for` loop.
"""

import asyncio
from inspect import stack
from collections.abc import AsyncGenerator, Iterable, AsyncIterator
from contextlib import asynccontextmanager
from enum import IntEnum
from io import SEEK_SET, SEEK_END, SEEK_CUR, TextIOWrapper
from os import PathLike
from pathlib import Path
from typing import Literal, AnyStr, Any

type FileMode = Literal[
    "r+",
    "+r",
    "rt+",
    "r+t",
    "+rt",
    "tr+",
    "t+r",
    "+tr",
    "w+",
    "+w",
    "wt+",
    "w+t",
    "+wt",
    "tw+",
    "t+w",
    "+tw",
    "a+",
    "+a",
    "at+",
    "a+t",
    "+at",
    "ta+",
    "t+a",
    "+ta",
    "x+",
    "+x",
    "xt+",
    "x+t",
    "+xt",
    "tx+",
    "t+x",
    "+tx",
    "w",
    "wt",
    "tw",
    "a",
    "at",
    "ta",
    "x",
    "xt",
    "tx",
    "r",
    "rt",
    "tr",
    "U",
    "rU",
    "Ur",
    "rtU",
    "rUt",
    "Urt",
    "trU",
    "tUr",
    "Utr",
]


[docs] class Whence(IntEnum): SET = SEEK_SET CUR = SEEK_CUR END = SEEK_END
async def _thread_execute(func, *args): return await asyncio.get_event_loop().run_in_executor(None, func, *args) class AsyncFile: def __init__(self, path: PathLike | str, file_obj: TextIOWrapper): stacks = stack() if stacks[1].function != "q2_open": raise ValueError("You must call the function 'q2_open' to use AsyncFile") if isinstance(path, str): self.path = Path(path) else: self.path = path self._fd = file_obj @property def name(self) -> str: return self._fd.name @property def buffer(self) -> object: return self._fd.buffer @property def closed(self) -> bool: return self._fd.closed @property def line_buffering(self) -> bool: return self._fd.line_buffering @property def mode(self) -> str: return self._fd.mode @property def write_through(self) -> bool: return self._fd.write_through @property def newlines(self) -> Any: return self._fd.newlines @property def encoding(self) -> str: return self._fd.encoding async def __aiter__(self) -> AsyncIterator[AnyStr]: while True: line = await self.readline() if line: yield line else: break def readable(self) -> bool: return self._fd.readable() def writable(self) -> bool: return self._fd.writable() def fileno(self) -> int: return self._fd.fileno() async def read(self, size: int = -1) -> str: return await _thread_execute(self._fd.read, size) async def readline(self, size: int = -1) -> str: return await _thread_execute(self._fd.readline, size) async def readlines(self, hint: int = -1) -> list[str]: return await _thread_execute(self._fd.readlines, hint) async def write(self, data: AnyStr) -> int: return await _thread_execute(self._fd.write, data) async def writelines(self, data: Iterable[AnyStr]) -> None: return await _thread_execute(self._fd.writelines, data) async def truncate(self) -> int: return await _thread_execute(self._fd.truncate) async def seek(self, offset: int, whence: Whence | Literal[0, 1, 2] = 0) -> int: return await _thread_execute(self._fd.seek, offset, whence) async def tell(self) -> int: return await _thread_execute(self._fd.tell) async def flush(self) -> None: await _thread_execute(self._fd.flush) async def close(self) -> None: await _thread_execute(self._fd.close)
[docs] @asynccontextmanager async def q2_open( path: PathLike | str, mode: FileMode = "r" ) -> AsyncGenerator[AsyncFile, None]: """ Analogous to `open()` but provides non-blocking async operations. Usage: .. code-block:: python async def example_func(path: Path) -> str: contents = "" async with q2_open(path) as file_handle: async for line in file_handle: if line.startswith("#"): contents = line break return contents """ file_obj = await _thread_execute(open, path, mode) file = AsyncFile(path, file_obj) yield file await file.close()