mirror of
https://github.com/Ladebeze66/projetcbaollm.git
synced 2025-12-16 20:17:51 +01:00
63 lines
2.3 KiB
Python
63 lines
2.3 KiB
Python
""" This module contains type hints for the anyio library. It was auto-generated so may include errors."""
|
|
from typing import Any, Callable, Coroutine, TypeVar, overload, Optional, Union
|
|
from types import TracebackType
|
|
|
|
T = TypeVar('T')
|
|
T_Retval = TypeVar('T_Retval')
|
|
|
|
class CapacityLimiter:
|
|
def __init__(self, total_tokens: float): ...
|
|
async def acquire(self) -> None: ...
|
|
async def acquire_nowait(self) -> None: ...
|
|
def release(self) -> None: ...
|
|
@property
|
|
def total_tokens(self) -> float: ...
|
|
@property
|
|
def available_tokens(self) -> float: ...
|
|
def __enter__(self) -> 'CapacityLimiter': ...
|
|
def __exit__(self, exc_type: Optional[type], exc_val: Optional[BaseException],
|
|
exc_tb: Optional[TracebackType]) -> None: ...
|
|
|
|
class to_thread:
|
|
@staticmethod
|
|
def run_sync(func: Callable[..., T], *args: Any, cancellable: bool = False,
|
|
limiter: Optional[CapacityLimiter] = None, **kwargs: Any) -> Coroutine[Any, Any, T]: ...
|
|
|
|
@overload
|
|
def run(func: Callable[[], T_Retval], *, backend: Optional[str] = ...,
|
|
backend_options: Optional[dict] = ...) -> T_Retval: ...
|
|
|
|
@overload
|
|
def run(func: Callable[..., T_Retval], *args: Any, backend: Optional[str] = ...,
|
|
backend_options: Optional[dict] = ..., **kwargs: Any) -> T_Retval: ...
|
|
|
|
def sleep(delay: Union[int, float]) -> Coroutine[Any, Any, None]: ...
|
|
|
|
async def sleep_forever() -> None: ...
|
|
|
|
def current_time() -> float: ...
|
|
|
|
def get_cancelled_exc_class() -> type[BaseException]: ...
|
|
|
|
def create_task_group() -> 'TaskGroup': ...
|
|
|
|
class TaskGroup:
|
|
async def __aenter__(self) -> 'TaskGroup': ...
|
|
async def __aexit__(self, exc_type: Optional[type], exc_val: Optional[BaseException],
|
|
exc_tb: Optional[TracebackType]) -> bool: ...
|
|
def start_soon(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any,
|
|
name: Optional[str] = None, **kwargs: Any) -> None: ...
|
|
|
|
def create_memory_object_stream(
|
|
max_buffer_size: int = 0
|
|
) -> tuple['MemoryObjectSender', 'MemoryObjectReceiver']: ...
|
|
|
|
class MemoryObjectSender:
|
|
async def send(self, item: Any) -> None: ...
|
|
def send_nowait(self, item: Any) -> None: ...
|
|
|
|
class MemoryObjectReceiver:
|
|
async def receive(self) -> Any: ...
|
|
def receive_nowait(self) -> Any: ...
|
|
|