diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/rich/progress.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/rich/progress.py | 1699 |
1 files changed, 0 insertions, 1699 deletions
diff --git a/venv/lib/python3.11/site-packages/rich/progress.py b/venv/lib/python3.11/site-packages/rich/progress.py deleted file mode 100644 index 8810aea..0000000 --- a/venv/lib/python3.11/site-packages/rich/progress.py +++ /dev/null @@ -1,1699 +0,0 @@ -import io -import sys -import typing -import warnings -from abc import ABC, abstractmethod -from collections import deque -from dataclasses import dataclass, field -from datetime import timedelta -from io import RawIOBase, UnsupportedOperation -from math import ceil -from mmap import mmap -from operator import length_hint -from os import PathLike, stat -from threading import Event, RLock, Thread -from types import TracebackType -from typing import ( - Any, - BinaryIO, - Callable, - ContextManager, - Deque, - Dict, - Generic, - Iterable, - List, - NamedTuple, - NewType, - Optional, - Sequence, - TextIO, - Tuple, - Type, - TypeVar, - Union, -) - -if sys.version_info >= (3, 8): - from typing import Literal -else: - from typing_extensions import Literal # pragma: no cover - -from . import filesize, get_console -from .console import Console, Group, JustifyMethod, RenderableType -from .highlighter import Highlighter -from .jupyter import JupyterMixin -from .live import Live -from .progress_bar import ProgressBar -from .spinner import Spinner -from .style import StyleType -from .table import Column, Table -from .text import Text, TextType - -TaskID = NewType("TaskID", int) - -ProgressType = TypeVar("ProgressType") - -GetTimeCallable = Callable[[], float] - - -_I = typing.TypeVar("_I", TextIO, BinaryIO) - - -class _TrackThread(Thread): - """A thread to periodically update progress.""" - - def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float): - self.progress = progress - self.task_id = task_id - self.update_period = update_period - self.done = Event() - - self.completed = 0 - super().__init__() - - def run(self) -> None: - task_id = self.task_id - advance = self.progress.advance - update_period = self.update_period - last_completed = 0 - wait = self.done.wait - while not wait(update_period): - completed = self.completed - if last_completed != completed: - advance(task_id, completed - last_completed) - last_completed = completed - - self.progress.update(self.task_id, completed=self.completed, refresh=True) - - def __enter__(self) -> "_TrackThread": - self.start() - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self.done.set() - self.join() - - -def track( - sequence: Union[Sequence[ProgressType], Iterable[ProgressType]], - description: str = "Working...", - total: Optional[float] = None, - auto_refresh: bool = True, - console: Optional[Console] = None, - transient: bool = False, - get_time: Optional[Callable[[], float]] = None, - refresh_per_second: float = 10, - style: StyleType = "bar.back", - complete_style: StyleType = "bar.complete", - finished_style: StyleType = "bar.finished", - pulse_style: StyleType = "bar.pulse", - update_period: float = 0.1, - disable: bool = False, - show_speed: bool = True, -) -> Iterable[ProgressType]: - """Track progress by iterating over a sequence. - - Args: - sequence (Iterable[ProgressType]): A sequence (must support "len") you wish to iterate over. - description (str, optional): Description of task show next to progress bar. Defaults to "Working". - total: (float, optional): Total number of steps. Default is len(sequence). - auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. - transient: (bool, optional): Clear the progress on exit. Defaults to False. - console (Console, optional): Console to write to. Default creates internal Console instance. - refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. - style (StyleType, optional): Style for the bar background. Defaults to "bar.back". - complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". - finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". - pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". - update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1. - disable (bool, optional): Disable display of progress. - show_speed (bool, optional): Show speed if total isn't known. Defaults to True. - Returns: - Iterable[ProgressType]: An iterable of the values in the sequence. - - """ - - columns: List["ProgressColumn"] = ( - [TextColumn("[progress.description]{task.description}")] if description else [] - ) - columns.extend( - ( - BarColumn( - style=style, - complete_style=complete_style, - finished_style=finished_style, - pulse_style=pulse_style, - ), - TaskProgressColumn(show_speed=show_speed), - TimeRemainingColumn(elapsed_when_finished=True), - ) - ) - progress = Progress( - *columns, - auto_refresh=auto_refresh, - console=console, - transient=transient, - get_time=get_time, - refresh_per_second=refresh_per_second or 10, - disable=disable, - ) - - with progress: - yield from progress.track( - sequence, total=total, description=description, update_period=update_period - ) - - -class _Reader(RawIOBase, BinaryIO): - """A reader that tracks progress while it's being read from.""" - - def __init__( - self, - handle: BinaryIO, - progress: "Progress", - task: TaskID, - close_handle: bool = True, - ) -> None: - self.handle = handle - self.progress = progress - self.task = task - self.close_handle = close_handle - self._closed = False - - def __enter__(self) -> "_Reader": - self.handle.__enter__() - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self.close() - - def __iter__(self) -> BinaryIO: - return self - - def __next__(self) -> bytes: - line = next(self.handle) - self.progress.advance(self.task, advance=len(line)) - return line - - @property - def closed(self) -> bool: - return self._closed - - def fileno(self) -> int: - return self.handle.fileno() - - def isatty(self) -> bool: - return self.handle.isatty() - - @property - def mode(self) -> str: - return self.handle.mode - - @property - def name(self) -> str: - return self.handle.name - - def readable(self) -> bool: - return self.handle.readable() - - def seekable(self) -> bool: - return self.handle.seekable() - - def writable(self) -> bool: - return False - - def read(self, size: int = -1) -> bytes: - block = self.handle.read(size) - self.progress.advance(self.task, advance=len(block)) - return block - - def readinto(self, b: Union[bytearray, memoryview, mmap]): # type: ignore[no-untyped-def, override] - n = self.handle.readinto(b) # type: ignore[attr-defined] - self.progress.advance(self.task, advance=n) - return n - - def readline(self, size: int = -1) -> bytes: # type: ignore[override] - line = self.handle.readline(size) - self.progress.advance(self.task, advance=len(line)) - return line - - def readlines(self, hint: int = -1) -> List[bytes]: - lines = self.handle.readlines(hint) - self.progress.advance(self.task, advance=sum(map(len, lines))) - return lines - - def close(self) -> None: - if self.close_handle: - self.handle.close() - self._closed = True - - def seek(self, offset: int, whence: int = 0) -> int: - pos = self.handle.seek(offset, whence) - self.progress.update(self.task, completed=pos) - return pos - - def tell(self) -> int: - return self.handle.tell() - - def write(self, s: Any) -> int: - raise UnsupportedOperation("write") - - -class _ReadContext(ContextManager[_I], Generic[_I]): - """A utility class to handle a context for both a reader and a progress.""" - - def __init__(self, progress: "Progress", reader: _I) -> None: - self.progress = progress - self.reader: _I = reader - - def __enter__(self) -> _I: - self.progress.start() - return self.reader.__enter__() - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self.progress.stop() - self.reader.__exit__(exc_type, exc_val, exc_tb) - - -def wrap_file( - file: BinaryIO, - total: int, - *, - description: str = "Reading...", - auto_refresh: bool = True, - console: Optional[Console] = None, - transient: bool = False, - get_time: Optional[Callable[[], float]] = None, - refresh_per_second: float = 10, - style: StyleType = "bar.back", - complete_style: StyleType = "bar.complete", - finished_style: StyleType = "bar.finished", - pulse_style: StyleType = "bar.pulse", - disable: bool = False, -) -> ContextManager[BinaryIO]: - """Read bytes from a file while tracking progress. - - Args: - file (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode. - total (int): Total number of bytes to read. - description (str, optional): Description of task show next to progress bar. Defaults to "Reading". - auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. - transient: (bool, optional): Clear the progress on exit. Defaults to False. - console (Console, optional): Console to write to. Default creates internal Console instance. - refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. - style (StyleType, optional): Style for the bar background. Defaults to "bar.back". - complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". - finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". - pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". - disable (bool, optional): Disable display of progress. - Returns: - ContextManager[BinaryIO]: A context manager yielding a progress reader. - - """ - - columns: List["ProgressColumn"] = ( - [TextColumn("[progress.description]{task.description}")] if description else [] - ) - columns.extend( - ( - BarColumn( - style=style, - complete_style=complete_style, - finished_style=finished_style, - pulse_style=pulse_style, - ), - DownloadColumn(), - TimeRemainingColumn(), - ) - ) - progress = Progress( - *columns, - auto_refresh=auto_refresh, - console=console, - transient=transient, - get_time=get_time, - refresh_per_second=refresh_per_second or 10, - disable=disable, - ) - - reader = progress.wrap_file(file, total=total, description=description) - return _ReadContext(progress, reader) - - -@typing.overload -def open( - file: Union[str, "PathLike[str]", bytes], - mode: Union[Literal["rt"], Literal["r"]], - buffering: int = -1, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - *, - total: Optional[int] = None, - description: str = "Reading...", - auto_refresh: bool = True, - console: Optional[Console] = None, - transient: bool = False, - get_time: Optional[Callable[[], float]] = None, - refresh_per_second: float = 10, - style: StyleType = "bar.back", - complete_style: StyleType = "bar.complete", - finished_style: StyleType = "bar.finished", - pulse_style: StyleType = "bar.pulse", - disable: bool = False, -) -> ContextManager[TextIO]: - pass - - -@typing.overload -def open( - file: Union[str, "PathLike[str]", bytes], - mode: Literal["rb"], - buffering: int = -1, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - *, - total: Optional[int] = None, - description: str = "Reading...", - auto_refresh: bool = True, - console: Optional[Console] = None, - transient: bool = False, - get_time: Optional[Callable[[], float]] = None, - refresh_per_second: float = 10, - style: StyleType = "bar.back", - complete_style: StyleType = "bar.complete", - finished_style: StyleType = "bar.finished", - pulse_style: StyleType = "bar.pulse", - disable: bool = False, -) -> ContextManager[BinaryIO]: - pass - - -def open( - file: Union[str, "PathLike[str]", bytes], - mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r", - buffering: int = -1, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - *, - total: Optional[int] = None, - description: str = "Reading...", - auto_refresh: bool = True, - console: Optional[Console] = None, - transient: bool = False, - get_time: Optional[Callable[[], float]] = None, - refresh_per_second: float = 10, - style: StyleType = "bar.back", - complete_style: StyleType = "bar.complete", - finished_style: StyleType = "bar.finished", - pulse_style: StyleType = "bar.pulse", - disable: bool = False, -) -> Union[ContextManager[BinaryIO], ContextManager[TextIO]]: - """Read bytes from a file while tracking progress. - - Args: - path (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode. - mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt". - buffering (int): The buffering strategy to use, see :func:`io.open`. - encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`. - errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`. - newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open` - total: (int, optional): Total number of bytes to read. Must be provided if reading from a file handle. Default for a path is os.stat(file).st_size. - description (str, optional): Description of task show next to progress bar. Defaults to "Reading". - auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. - transient: (bool, optional): Clear the progress on exit. Defaults to False. - console (Console, optional): Console to write to. Default creates internal Console instance. - refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. - style (StyleType, optional): Style for the bar background. Defaults to "bar.back". - complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". - finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". - pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". - disable (bool, optional): Disable display of progress. - encoding (str, optional): The encoding to use when reading in text mode. - - Returns: - ContextManager[BinaryIO]: A context manager yielding a progress reader. - - """ - - columns: List["ProgressColumn"] = ( - [TextColumn("[progress.description]{task.description}")] if description else [] - ) - columns.extend( - ( - BarColumn( - style=style, - complete_style=complete_style, - finished_style=finished_style, - pulse_style=pulse_style, - ), - DownloadColumn(), - TimeRemainingColumn(), - ) - ) - progress = Progress( - *columns, - auto_refresh=auto_refresh, - console=console, - transient=transient, - get_time=get_time, - refresh_per_second=refresh_per_second or 10, - disable=disable, - ) - - reader = progress.open( - file, - mode=mode, - buffering=buffering, - encoding=encoding, - errors=errors, - newline=newline, - total=total, - description=description, - ) - return _ReadContext(progress, reader) # type: ignore[return-value, type-var] - - -class ProgressColumn(ABC): - """Base class for a widget to use in progress display.""" - - max_refresh: Optional[float] = None - - def __init__(self, table_column: Optional[Column] = None) -> None: - self._table_column = table_column - self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {} - self._update_time: Optional[float] = None - - def get_table_column(self) -> Column: - """Get a table column, used to build tasks table.""" - return self._table_column or Column() - - def __call__(self, task: "Task") -> RenderableType: - """Called by the Progress object to return a renderable for the given task. - - Args: - task (Task): An object containing information regarding the task. - - Returns: - RenderableType: Anything renderable (including str). - """ - current_time = task.get_time() - if self.max_refresh is not None and not task.completed: - try: - timestamp, renderable = self._renderable_cache[task.id] - except KeyError: - pass - else: - if timestamp + self.max_refresh > current_time: - return renderable - - renderable = self.render(task) - self._renderable_cache[task.id] = (current_time, renderable) - return renderable - - @abstractmethod - def render(self, task: "Task") -> RenderableType: - """Should return a renderable object.""" - - -class RenderableColumn(ProgressColumn): - """A column to insert an arbitrary column. - - Args: - renderable (RenderableType, optional): Any renderable. Defaults to empty string. - """ - - def __init__( - self, renderable: RenderableType = "", *, table_column: Optional[Column] = None - ): - self.renderable = renderable - super().__init__(table_column=table_column) - - def render(self, task: "Task") -> RenderableType: - return self.renderable - - -class SpinnerColumn(ProgressColumn): - """A column with a 'spinner' animation. - - Args: - spinner_name (str, optional): Name of spinner animation. Defaults to "dots". - style (StyleType, optional): Style of spinner. Defaults to "progress.spinner". - speed (float, optional): Speed factor of spinner. Defaults to 1.0. - finished_text (TextType, optional): Text used when task is finished. Defaults to " ". - """ - - def __init__( - self, - spinner_name: str = "dots", - style: Optional[StyleType] = "progress.spinner", - speed: float = 1.0, - finished_text: TextType = " ", - table_column: Optional[Column] = None, - ): - self.spinner = Spinner(spinner_name, style=style, speed=speed) - self.finished_text = ( - Text.from_markup(finished_text) - if isinstance(finished_text, str) - else finished_text - ) - super().__init__(table_column=table_column) - - def set_spinner( - self, - spinner_name: str, - spinner_style: Optional[StyleType] = "progress.spinner", - speed: float = 1.0, - ) -> None: - """Set a new spinner. - - Args: - spinner_name (str): Spinner name, see python -m rich.spinner. - spinner_style (Optional[StyleType], optional): Spinner style. Defaults to "progress.spinner". - speed (float, optional): Speed factor of spinner. Defaults to 1.0. - """ - self.spinner = Spinner(spinner_name, style=spinner_style, speed=speed) - - def render(self, task: "Task") -> RenderableType: - text = ( - self.finished_text - if task.finished - else self.spinner.render(task.get_time()) - ) - return text - - -class TextColumn(ProgressColumn): - """A column containing text.""" - - def __init__( - self, - text_format: str, - style: StyleType = "none", - justify: JustifyMethod = "left", - markup: bool = True, - highlighter: Optional[Highlighter] = None, - table_column: Optional[Column] = None, - ) -> None: - self.text_format = text_format - self.justify: JustifyMethod = justify - self.style = style - self.markup = markup - self.highlighter = highlighter - super().__init__(table_column=table_column or Column(no_wrap=True)) - - def render(self, task: "Task") -> Text: - _text = self.text_format.format(task=task) - if self.markup: - text = Text.from_markup(_text, style=self.style, justify=self.justify) - else: - text = Text(_text, style=self.style, justify=self.justify) - if self.highlighter: - self.highlighter.highlight(text) - return text - - -class BarColumn(ProgressColumn): - """Renders a visual progress bar. - - Args: - bar_width (Optional[int], optional): Width of bar or None for full width. Defaults to 40. - style (StyleType, optional): Style for the bar background. Defaults to "bar.back". - complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". - finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". - pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". - """ - - def __init__( - self, - bar_width: Optional[int] = 40, - style: StyleType = "bar.back", - complete_style: StyleType = "bar.complete", - finished_style: StyleType = "bar.finished", - pulse_style: StyleType = "bar.pulse", - table_column: Optional[Column] = None, - ) -> None: - self.bar_width = bar_width - self.style = style - self.complete_style = complete_style - self.finished_style = finished_style - self.pulse_style = pulse_style - super().__init__(table_column=table_column) - - def render(self, task: "Task") -> ProgressBar: - """Gets a progress bar widget for a task.""" - return ProgressBar( - total=max(0, task.total) if task.total is not None else None, - completed=max(0, task.completed), - width=None if self.bar_width is None else max(1, self.bar_width), - pulse=not task.started, - animation_time=task.get_time(), - style=self.style, - complete_style=self.complete_style, - finished_style=self.finished_style, - pulse_style=self.pulse_style, - ) - - -class TimeElapsedColumn(ProgressColumn): - """Renders time elapsed.""" - - def render(self, task: "Task") -> Text: - """Show time elapsed.""" - elapsed = task.finished_time if task.finished else task.elapsed - if elapsed is None: - return Text("-:--:--", style="progress.elapsed") - delta = timedelta(seconds=max(0, int(elapsed))) - return Text(str(delta), style="progress.elapsed") - - -class TaskProgressColumn(TextColumn): - """Show task progress as a percentage. - - Args: - text_format (str, optional): Format for percentage display. Defaults to "[progress.percentage]{task.percentage:>3.0f}%". - text_format_no_percentage (str, optional): Format if percentage is unknown. Defaults to "". - style (StyleType, optional): Style of output. Defaults to "none". - justify (JustifyMethod, optional): Text justification. Defaults to "left". - markup (bool, optional): Enable markup. Defaults to True. - highlighter (Optional[Highlighter], optional): Highlighter to apply to output. Defaults to None. - table_column (Optional[Column], optional): Table Column to use. Defaults to None. - show_speed (bool, optional): Show speed if total is unknown. Defaults to False. - """ - - def __init__( - self, - text_format: str = "[progress.percentage]{task.percentage:>3.0f}%", - text_format_no_percentage: str = "", - style: StyleType = "none", - justify: JustifyMethod = "left", - markup: bool = True, - highlighter: Optional[Highlighter] = None, - table_column: Optional[Column] = None, - show_speed: bool = False, - ) -> None: - self.text_format_no_percentage = text_format_no_percentage - self.show_speed = show_speed - super().__init__( - text_format=text_format, - style=style, - justify=justify, - markup=markup, - highlighter=highlighter, - table_column=table_column, - ) - - @classmethod - def render_speed(cls, speed: Optional[float]) -> Text: - """Render the speed in iterations per second. - - Args: - task (Task): A Task object. - - Returns: - Text: Text object containing the task speed. - """ - if speed is None: - return Text("", style="progress.percentage") - unit, suffix = filesize.pick_unit_and_suffix( - int(speed), - ["", "×10³", "×10⁶", "×10⁹", "×10¹²"], - 1000, - ) - data_speed = speed / unit - return Text(f"{data_speed:.1f}{suffix} it/s", style="progress.percentage") - - def render(self, task: "Task") -> Text: - if task.total is None and self.show_speed: - return self.render_speed(task.finished_speed or task.speed) - text_format = ( - self.text_format_no_percentage if task.total is None else self.text_format - ) - _text = text_format.format(task=task) - if self.markup: - text = Text.from_markup(_text, style=self.style, justify=self.justify) - else: - text = Text(_text, style=self.style, justify=self.justify) - if self.highlighter: - self.highlighter.highlight(text) - return text - - -class TimeRemainingColumn(ProgressColumn): - """Renders estimated time remaining. - - Args: - compact (bool, optional): Render MM:SS when time remaining is less than an hour. Defaults to False. - elapsed_when_finished (bool, optional): Render time elapsed when the task is finished. Defaults to False. - """ - - # Only refresh twice a second to prevent jitter - max_refresh = 0.5 - - def __init__( - self, - compact: bool = False, - elapsed_when_finished: bool = False, - table_column: Optional[Column] = None, - ): - self.compact = compact - self.elapsed_when_finished = elapsed_when_finished - super().__init__(table_column=table_column) - - def render(self, task: "Task") -> Text: - """Show time remaining.""" - if self.elapsed_when_finished and task.finished: - task_time = task.finished_time - style = "progress.elapsed" - else: - task_time = task.time_remaining - style = "progress.remaining" - - if task.total is None: - return Text("", style=style) - - if task_time is None: - return Text("--:--" if self.compact else "-:--:--", style=style) - - # Based on https://github.com/tqdm/tqdm/blob/master/tqdm/std.py - minutes, seconds = divmod(int(task_time), 60) - hours, minutes = divmod(minutes, 60) - - if self.compact and not hours: - formatted = f"{minutes:02d}:{seconds:02d}" - else: - formatted = f"{hours:d}:{minutes:02d}:{seconds:02d}" - - return Text(formatted, style=style) - - -class FileSizeColumn(ProgressColumn): - """Renders completed filesize.""" - - def render(self, task: "Task") -> Text: - """Show data completed.""" - data_size = filesize.decimal(int(task.completed)) - return Text(data_size, style="progress.filesize") - - -class TotalFileSizeColumn(ProgressColumn): - """Renders total filesize.""" - - def render(self, task: "Task") -> Text: - """Show data completed.""" - data_size = filesize.decimal(int(task.total)) if task.total is not None else "" - return Text(data_size, style="progress.filesize.total") - - -class MofNCompleteColumn(ProgressColumn): - """Renders completed count/total, e.g. ' 10/1000'. - - Best for bounded tasks with int quantities. - - Space pads the completed count so that progress length does not change as task progresses - past powers of 10. - - Args: - separator (str, optional): Text to separate completed and total values. Defaults to "/". - """ - - def __init__(self, separator: str = "/", table_column: Optional[Column] = None): - self.separator = separator - super().__init__(table_column=table_column) - - def render(self, task: "Task") -> Text: - """Show completed/total.""" - completed = int(task.completed) - total = int(task.total) if task.total is not None else "?" - total_width = len(str(total)) - return Text( - f"{completed:{total_width}d}{self.separator}{total}", - style="progress.download", - ) - - -class DownloadColumn(ProgressColumn): - """Renders file size downloaded and total, e.g. '0.5/2.3 GB'. - - Args: - binary_units (bool, optional): Use binary units, KiB, MiB etc. Defaults to False. - """ - - def __init__( - self, binary_units: bool = False, table_column: Optional[Column] = None - ) -> None: - self.binary_units = binary_units - super().__init__(table_column=table_column) - - def render(self, task: "Task") -> Text: - """Calculate common unit for completed and total.""" - completed = int(task.completed) - - unit_and_suffix_calculation_base = ( - int(task.total) if task.total is not None else completed - ) - if self.binary_units: - unit, suffix = filesize.pick_unit_and_suffix( - unit_and_suffix_calculation_base, - ["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"], - 1024, - ) - else: - unit, suffix = filesize.pick_unit_and_suffix( - unit_and_suffix_calculation_base, - ["bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"], - 1000, - ) - precision = 0 if unit == 1 else 1 - - completed_ratio = completed / unit - completed_str = f"{completed_ratio:,.{precision}f}" - - if task.total is not None: - total = int(task.total) - total_ratio = total / unit - total_str = f"{total_ratio:,.{precision}f}" - else: - total_str = "?" - - download_status = f"{completed_str}/{total_str} {suffix}" - download_text = Text(download_status, style="progress.download") - return download_text - - -class TransferSpeedColumn(ProgressColumn): - """Renders human readable transfer speed.""" - - def render(self, task: "Task") -> Text: - """Show data transfer speed.""" - speed = task.finished_speed or task.speed - if speed is None: - return Text("?", style="progress.data.speed") - data_speed = filesize.decimal(int(speed)) - return Text(f"{data_speed}/s", style="progress.data.speed") - - -class ProgressSample(NamedTuple): - """Sample of progress for a given time.""" - - timestamp: float - """Timestamp of sample.""" - completed: float - """Number of steps completed.""" - - -@dataclass -class Task: - """Information regarding a progress task. - - This object should be considered read-only outside of the :class:`~Progress` class. - - """ - - id: TaskID - """Task ID associated with this task (used in Progress methods).""" - - description: str - """str: Description of the task.""" - - total: Optional[float] - """Optional[float]: Total number of steps in this task.""" - - completed: float - """float: Number of steps completed""" - - _get_time: GetTimeCallable - """Callable to get the current time.""" - - finished_time: Optional[float] = None - """float: Time task was finished.""" - - visible: bool = True - """bool: Indicates if this task is visible in the progress display.""" - - fields: Dict[str, Any] = field(default_factory=dict) - """dict: Arbitrary fields passed in via Progress.update.""" - - start_time: Optional[float] = field(default=None, init=False, repr=False) - """Optional[float]: Time this task was started, or None if not started.""" - - stop_time: Optional[float] = field(default=None, init=False, repr=False) - """Optional[float]: Time this task was stopped, or None if not stopped.""" - - finished_speed: Optional[float] = None - """Optional[float]: The last speed for a finished task.""" - - _progress: Deque[ProgressSample] = field( - default_factory=lambda: deque(maxlen=1000), init=False, repr=False - ) - - _lock: RLock = field(repr=False, default_factory=RLock) - """Thread lock.""" - - def get_time(self) -> float: - """float: Get the current time, in seconds.""" - return self._get_time() - - @property - def started(self) -> bool: - """bool: Check if the task as started.""" - return self.start_time is not None - - @property - def remaining(self) -> Optional[float]: - """Optional[float]: Get the number of steps remaining, if a non-None total was set.""" - if self.total is None: - return None - return self.total - self.completed - - @property - def elapsed(self) -> Optional[float]: - """Optional[float]: Time elapsed since task was started, or ``None`` if the task hasn't started.""" - if self.start_time is None: - return None - if self.stop_time is not None: - return self.stop_time - self.start_time - return self.get_time() - self.start_time - - @property - def finished(self) -> bool: - """Check if the task has finished.""" - return self.finished_time is not None - - @property - def percentage(self) -> float: - """float: Get progress of task as a percentage. If a None total was set, returns 0""" - if not self.total: - return 0.0 - completed = (self.completed / self.total) * 100.0 - completed = min(100.0, max(0.0, completed)) - return completed - - @property - def speed(self) -> Optional[float]: - """Optional[float]: Get the estimated speed in steps per second.""" - if self.start_time is None: - return None - with self._lock: - progress = self._progress - if not progress: - return None - total_time = progress[-1].timestamp - progress[0].timestamp - if total_time == 0: - return None - iter_progress = iter(progress) - next(iter_progress) - total_completed = sum(sample.completed for sample in iter_progress) - speed = total_completed / total_time - return speed - - @property - def time_remaining(self) -> Optional[float]: - """Optional[float]: Get estimated time to completion, or ``None`` if no data.""" - if self.finished: - return 0.0 - speed = self.speed - if not speed: - return None - remaining = self.remaining - if remaining is None: - return None - estimate = ceil(remaining / speed) - return estimate - - def _reset(self) -> None: - """Reset progress.""" - self._progress.clear() - self.finished_time = None - self.finished_speed = None - - -class Progress(JupyterMixin): - """Renders an auto-updating progress bar(s). - - Args: - console (Console, optional): Optional Console instance. Default will an internal Console instance writing to stdout. - auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`. - refresh_per_second (Optional[float], optional): Number of times per second to refresh the progress information or None to use default (10). Defaults to None. - speed_estimate_period: (float, optional): Period (in seconds) used to calculate the speed estimate. Defaults to 30. - transient: (bool, optional): Clear the progress on exit. Defaults to False. - redirect_stdout: (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True. - redirect_stderr: (bool, optional): Enable redirection of stderr. Defaults to True. - get_time: (Callable, optional): A callable that gets the current time, or None to use Console.get_time. Defaults to None. - disable (bool, optional): Disable progress display. Defaults to False - expand (bool, optional): Expand tasks table to fit width. Defaults to False. - """ - - def __init__( - self, - *columns: Union[str, ProgressColumn], - console: Optional[Console] = None, - auto_refresh: bool = True, - refresh_per_second: float = 10, - speed_estimate_period: float = 30.0, - transient: bool = False, - redirect_stdout: bool = True, - redirect_stderr: bool = True, - get_time: Optional[GetTimeCallable] = None, - disable: bool = False, - expand: bool = False, - ) -> None: - assert refresh_per_second > 0, "refresh_per_second must be > 0" - self._lock = RLock() - self.columns = columns or self.get_default_columns() - self.speed_estimate_period = speed_estimate_period - - self.disable = disable - self.expand = expand - self._tasks: Dict[TaskID, Task] = {} - self._task_index: TaskID = TaskID(0) - self.live = Live( - console=console or get_console(), - auto_refresh=auto_refresh, - refresh_per_second=refresh_per_second, - transient=transient, - redirect_stdout=redirect_stdout, - redirect_stderr=redirect_stderr, - get_renderable=self.get_renderable, - ) - self.get_time = get_time or self.console.get_time - self.print = self.console.print - self.log = self.console.log - - @classmethod - def get_default_columns(cls) -> Tuple[ProgressColumn, ...]: - """Get the default columns used for a new Progress instance: - - a text column for the description (TextColumn) - - the bar itself (BarColumn) - - a text column showing completion percentage (TextColumn) - - an estimated-time-remaining column (TimeRemainingColumn) - If the Progress instance is created without passing a columns argument, - the default columns defined here will be used. - - You can also create a Progress instance using custom columns before - and/or after the defaults, as in this example: - - progress = Progress( - SpinnerColumn(), - *Progress.get_default_columns(), - "Elapsed:", - TimeElapsedColumn(), - ) - - This code shows the creation of a Progress display, containing - a spinner to the left, the default columns, and a labeled elapsed - time column. - """ - return ( - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - TimeRemainingColumn(), - ) - - @property - def console(self) -> Console: - return self.live.console - - @property - def tasks(self) -> List[Task]: - """Get a list of Task instances.""" - with self._lock: - return list(self._tasks.values()) - - @property - def task_ids(self) -> List[TaskID]: - """A list of task IDs.""" - with self._lock: - return list(self._tasks.keys()) - - @property - def finished(self) -> bool: - """Check if all tasks have been completed.""" - with self._lock: - if not self._tasks: - return True - return all(task.finished for task in self._tasks.values()) - - def start(self) -> None: - """Start the progress display.""" - if not self.disable: - self.live.start(refresh=True) - - def stop(self) -> None: - """Stop the progress display.""" - self.live.stop() - if not self.console.is_interactive: - self.console.print() - - def __enter__(self) -> "Progress": - self.start() - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self.stop() - - def track( - self, - sequence: Union[Iterable[ProgressType], Sequence[ProgressType]], - total: Optional[float] = None, - task_id: Optional[TaskID] = None, - description: str = "Working...", - update_period: float = 0.1, - ) -> Iterable[ProgressType]: - """Track progress by iterating over a sequence. - - Args: - sequence (Sequence[ProgressType]): A sequence of values you want to iterate over and track progress. - total: (float, optional): Total number of steps. Default is len(sequence). - task_id: (TaskID): Task to track. Default is new task. - description: (str, optional): Description of task, if new task is created. - update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1. - - Returns: - Iterable[ProgressType]: An iterable of values taken from the provided sequence. - """ - if total is None: - total = float(length_hint(sequence)) or None - - if task_id is None: - task_id = self.add_task(description, total=total) - else: - self.update(task_id, total=total) - - if self.live.auto_refresh: - with _TrackThread(self, task_id, update_period) as track_thread: - for value in sequence: - yield value - track_thread.completed += 1 - else: - advance = self.advance - refresh = self.refresh - for value in sequence: - yield value - advance(task_id, 1) - refresh() - - def wrap_file( - self, - file: BinaryIO, - total: Optional[int] = None, - *, - task_id: Optional[TaskID] = None, - description: str = "Reading...", - ) -> BinaryIO: - """Track progress file reading from a binary file. - - Args: - file (BinaryIO): A file-like object opened in binary mode. - total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given. - task_id (TaskID): Task to track. Default is new task. - description (str, optional): Description of task, if new task is created. - - Returns: - BinaryIO: A readable file-like object in binary mode. - - Raises: - ValueError: When no total value can be extracted from the arguments or the task. - """ - # attempt to recover the total from the task - total_bytes: Optional[float] = None - if total is not None: - total_bytes = total - elif task_id is not None: - with self._lock: - total_bytes = self._tasks[task_id].total - if total_bytes is None: - raise ValueError( - f"unable to get the total number of bytes, please specify 'total'" - ) - - # update total of task or create new task - if task_id is None: - task_id = self.add_task(description, total=total_bytes) - else: - self.update(task_id, total=total_bytes) - - return _Reader(file, self, task_id, close_handle=False) - - @typing.overload - def open( - self, - file: Union[str, "PathLike[str]", bytes], - mode: Literal["rb"], - buffering: int = -1, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - *, - total: Optional[int] = None, - task_id: Optional[TaskID] = None, - description: str = "Reading...", - ) -> BinaryIO: - pass - - @typing.overload - def open( - self, - file: Union[str, "PathLike[str]", bytes], - mode: Union[Literal["r"], Literal["rt"]], - buffering: int = -1, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - *, - total: Optional[int] = None, - task_id: Optional[TaskID] = None, - description: str = "Reading...", - ) -> TextIO: - pass - - def open( - self, - file: Union[str, "PathLike[str]", bytes], - mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r", - buffering: int = -1, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - *, - total: Optional[int] = None, - task_id: Optional[TaskID] = None, - description: str = "Reading...", - ) -> Union[BinaryIO, TextIO]: - """Track progress while reading from a binary file. - - Args: - path (Union[str, PathLike[str]]): The path to the file to read. - mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt". - buffering (int): The buffering strategy to use, see :func:`io.open`. - encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`. - errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`. - newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`. - total (int, optional): Total number of bytes to read. If none given, os.stat(path).st_size is used. - task_id (TaskID): Task to track. Default is new task. - description (str, optional): Description of task, if new task is created. - - Returns: - BinaryIO: A readable file-like object in binary mode. - - Raises: - ValueError: When an invalid mode is given. - """ - # normalize the mode (always rb, rt) - _mode = "".join(sorted(mode, reverse=False)) - if _mode not in ("br", "rt", "r"): - raise ValueError("invalid mode {!r}".format(mode)) - - # patch buffering to provide the same behaviour as the builtin `open` - line_buffering = buffering == 1 - if _mode == "br" and buffering == 1: - warnings.warn( - "line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used", - RuntimeWarning, - ) - buffering = -1 - elif _mode in ("rt", "r"): - if buffering == 0: - raise ValueError("can't have unbuffered text I/O") - elif buffering == 1: - buffering = -1 - - # attempt to get the total with `os.stat` - if total is None: - total = stat(file).st_size - - # update total of task or create new task - if task_id is None: - task_id = self.add_task(description, total=total) - else: - self.update(task_id, total=total) - - # open the file in binary mode, - handle = io.open(file, "rb", buffering=buffering) - reader = _Reader(handle, self, task_id, close_handle=True) - - # wrap the reader in a `TextIOWrapper` if text mode - if mode in ("r", "rt"): - return io.TextIOWrapper( - reader, - encoding=encoding, - errors=errors, - newline=newline, - line_buffering=line_buffering, - ) - - return reader - - def start_task(self, task_id: TaskID) -> None: - """Start a task. - - Starts a task (used when calculating elapsed time). You may need to call this manually, - if you called ``add_task`` with ``start=False``. - - Args: - task_id (TaskID): ID of task. - """ - with self._lock: - task = self._tasks[task_id] - if task.start_time is None: - task.start_time = self.get_time() - - def stop_task(self, task_id: TaskID) -> None: - """Stop a task. - - This will freeze the elapsed time on the task. - - Args: - task_id (TaskID): ID of task. - """ - with self._lock: - task = self._tasks[task_id] - current_time = self.get_time() - if task.start_time is None: - task.start_time = current_time - task.stop_time = current_time - - def update( - self, - task_id: TaskID, - *, - total: Optional[float] = None, - completed: Optional[float] = None, - advance: Optional[float] = None, - description: Optional[str] = None, - visible: Optional[bool] = None, - refresh: bool = False, - **fields: Any, - ) -> None: - """Update information associated with a task. - - Args: - task_id (TaskID): Task id (returned by add_task). - total (float, optional): Updates task.total if not None. - completed (float, optional): Updates task.completed if not None. - advance (float, optional): Add a value to task.completed if not None. - description (str, optional): Change task description if not None. - visible (bool, optional): Set visible flag if not None. - refresh (bool): Force a refresh of progress information. Default is False. - **fields (Any): Additional data fields required for rendering. - """ - with self._lock: - task = self._tasks[task_id] - completed_start = task.completed - - if total is not None and total != task.total: - task.total = total - task._reset() - if advance is not None: - task.completed += advance - if completed is not None: - task.completed = completed - if description is not None: - task.description = description - if visible is not None: - task.visible = visible - task.fields.update(fields) - update_completed = task.completed - completed_start - - current_time = self.get_time() - old_sample_time = current_time - self.speed_estimate_period - _progress = task._progress - - popleft = _progress.popleft - while _progress and _progress[0].timestamp < old_sample_time: - popleft() - if update_completed > 0: - _progress.append(ProgressSample(current_time, update_completed)) - if ( - task.total is not None - and task.completed >= task.total - and task.finished_time is None - ): - task.finished_time = task.elapsed - - if refresh: - self.refresh() - - def reset( - self, - task_id: TaskID, - *, - start: bool = True, - total: Optional[float] = None, - completed: int = 0, - visible: Optional[bool] = None, - description: Optional[str] = None, - **fields: Any, - ) -> None: - """Reset a task so completed is 0 and the clock is reset. - - Args: - task_id (TaskID): ID of task. - start (bool, optional): Start the task after reset. Defaults to True. - total (float, optional): New total steps in task, or None to use current total. Defaults to None. - completed (int, optional): Number of steps completed. Defaults to 0. - visible (bool, optional): Enable display of the task. Defaults to True. - description (str, optional): Change task description if not None. Defaults to None. - **fields (str): Additional data fields required for rendering. - """ - current_time = self.get_time() - with self._lock: - task = self._tasks[task_id] - task._reset() - task.start_time = current_time if start else None - if total is not None: - task.total = total - task.completed = completed - if visible is not None: - task.visible = visible - if fields: - task.fields = fields - if description is not None: - task.description = description - task.finished_time = None - self.refresh() - - def advance(self, task_id: TaskID, advance: float = 1) -> None: - """Advance task by a number of steps. - - Args: - task_id (TaskID): ID of task. - advance (float): Number of steps to advance. Default is 1. - """ - current_time = self.get_time() - with self._lock: - task = self._tasks[task_id] - completed_start = task.completed - task.completed += advance - update_completed = task.completed - completed_start - old_sample_time = current_time - self.speed_estimate_period - _progress = task._progress - - popleft = _progress.popleft - while _progress and _progress[0].timestamp < old_sample_time: - popleft() - while len(_progress) > 1000: - popleft() - _progress.append(ProgressSample(current_time, update_completed)) - if ( - task.total is not None - and task.completed >= task.total - and task.finished_time is None - ): - task.finished_time = task.elapsed - task.finished_speed = task.speed - - def refresh(self) -> None: - """Refresh (render) the progress information.""" - if not self.disable and self.live.is_started: - self.live.refresh() - - def get_renderable(self) -> RenderableType: - """Get a renderable for the progress display.""" - renderable = Group(*self.get_renderables()) - return renderable - - def get_renderables(self) -> Iterable[RenderableType]: - """Get a number of renderables for the progress display.""" - table = self.make_tasks_table(self.tasks) - yield table - - def make_tasks_table(self, tasks: Iterable[Task]) -> Table: - """Get a table to render the Progress display. - - Args: - tasks (Iterable[Task]): An iterable of Task instances, one per row of the table. - - Returns: - Table: A table instance. - """ - table_columns = ( - ( - Column(no_wrap=True) - if isinstance(_column, str) - else _column.get_table_column().copy() - ) - for _column in self.columns - ) - table = Table.grid(*table_columns, padding=(0, 1), expand=self.expand) - - for task in tasks: - if task.visible: - table.add_row( - *( - ( - column.format(task=task) - if isinstance(column, str) - else column(task) - ) - for column in self.columns - ) - ) - return table - - def __rich__(self) -> RenderableType: - """Makes the Progress class itself renderable.""" - with self._lock: - return self.get_renderable() - - def add_task( - self, - description: str, - start: bool = True, - total: Optional[float] = 100.0, - completed: int = 0, - visible: bool = True, - **fields: Any, - ) -> TaskID: - """Add a new 'task' to the Progress display. - - Args: - description (str): A description of the task. - start (bool, optional): Start the task immediately (to calculate elapsed time). If set to False, - you will need to call `start` manually. Defaults to True. - total (float, optional): Number of total steps in the progress if known. - Set to None to render a pulsing animation. Defaults to 100. - completed (int, optional): Number of steps completed so far. Defaults to 0. - visible (bool, optional): Enable display of the task. Defaults to True. - **fields (str): Additional data fields required for rendering. - - Returns: - TaskID: An ID you can use when calling `update`. - """ - with self._lock: - task = Task( - self._task_index, - description, - total, - completed, - visible=visible, - fields=fields, - _get_time=self.get_time, - _lock=self._lock, - ) - self._tasks[self._task_index] = task - if start: - self.start_task(self._task_index) - new_task_index = self._task_index - self._task_index = TaskID(int(self._task_index) + 1) - self.refresh() - return new_task_index - - def remove_task(self, task_id: TaskID) -> None: - """Delete a task if it exists. - - Args: - task_id (TaskID): A task ID. - - """ - with self._lock: - del self._tasks[task_id] - - -if __name__ == "__main__": # pragma: no coverage - import random - import time - - from .panel import Panel - from .rule import Rule - from .syntax import Syntax - from .table import Table - - syntax = Syntax( - '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]: - """Iterate and generate a tuple with a flag for last value.""" - iter_values = iter(values) - try: - previous_value = next(iter_values) - except StopIteration: - return - for value in iter_values: - yield False, previous_value - previous_value = value - yield True, previous_value''', - "python", - line_numbers=True, - ) - - table = Table("foo", "bar", "baz") - table.add_row("1", "2", "3") - - progress_renderables = [ - "Text may be printed while the progress bars are rendering.", - Panel("In fact, [i]any[/i] renderable will work"), - "Such as [magenta]tables[/]...", - table, - "Pretty printed structures...", - {"type": "example", "text": "Pretty printed"}, - "Syntax...", - syntax, - Rule("Give it a try!"), - ] - - from itertools import cycle - - examples = cycle(progress_renderables) - - console = Console(record=True) - - with Progress( - SpinnerColumn(), - *Progress.get_default_columns(), - TimeElapsedColumn(), - console=console, - transient=False, - ) as progress: - task1 = progress.add_task("[red]Downloading", total=1000) - task2 = progress.add_task("[green]Processing", total=1000) - task3 = progress.add_task("[yellow]Thinking", total=None) - - while not progress.finished: - progress.update(task1, advance=0.5) - progress.update(task2, advance=0.3) - time.sleep(0.01) - if random.randint(0, 100) < 1: - progress.log(next(examples)) |