File: //usr/local/lib/python3.9/site-packages/prompt_toolkit/shortcuts/progress_bar/base.py
"""
Progress bar implementation on top of prompt_toolkit.
::
with ProgressBar(...) as pb:
for item in pb(data):
...
"""
from __future__ import annotations
import contextvars
import datetime
import functools
import os
import signal
import threading
import traceback
from typing import (
Callable,
Generic,
Iterable,
Iterator,
Sequence,
Sized,
TextIO,
TypeVar,
cast,
)
from prompt_toolkit.application import Application
from prompt_toolkit.application.current import get_app_session
from prompt_toolkit.filters import Condition, is_done, renderer_height_is_known
from prompt_toolkit.formatted_text import (
AnyFormattedText,
StyleAndTextTuples,
to_formatted_text,
)
from prompt_toolkit.input import Input
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.key_binding.key_processor import KeyPressEvent
from prompt_toolkit.layout import (
ConditionalContainer,
FormattedTextControl,
HSplit,
Layout,
VSplit,
Window,
)
from prompt_toolkit.layout.controls import UIContent, UIControl
from prompt_toolkit.layout.dimension import AnyDimension, D
from prompt_toolkit.output import ColorDepth, Output
from prompt_toolkit.styles import BaseStyle
from prompt_toolkit.utils import in_main_thread
from .formatters import Formatter, create_default_formatters
__all__ = ["ProgressBar"]
E = KeyPressEvent
_SIGWINCH = getattr(signal, "SIGWINCH", None)
def create_key_bindings(cancel_callback: Callable[[], None] | None) -> KeyBindings:
"""
Key bindings handled by the progress bar.
(The main thread is not supposed to handle any key bindings.)
"""
kb = KeyBindings()
@kb.add("c-l")
def _clear(event: E) -> None:
event.app.renderer.clear()
if cancel_callback is not None:
@kb.add("c-c")
def _interrupt(event: E) -> None:
"Kill the 'body' of the progress bar, but only if we run from the main thread."
assert cancel_callback is not None
cancel_callback()
return kb
_T = TypeVar("_T")
class ProgressBar:
"""
Progress bar context manager.
Usage ::
with ProgressBar(...) as pb:
for item in pb(data):
...
:param title: Text to be displayed above the progress bars. This can be a
callable or formatted text as well.
:param formatters: List of :class:`.Formatter` instances.
:param bottom_toolbar: Text to be displayed in the bottom toolbar. This
can be a callable or formatted text.
:param style: :class:`prompt_toolkit.styles.BaseStyle` instance.
:param key_bindings: :class:`.KeyBindings` instance.
:param cancel_callback: Callback function that's called when control-c is
pressed by the user. This can be used for instance to start "proper"
cancellation if the wrapped code supports it.
:param file: The file object used for rendering, by default `sys.stderr` is used.
:param color_depth: `prompt_toolkit` `ColorDepth` instance.
:param output: :class:`~prompt_toolkit.output.Output` instance.
:param input: :class:`~prompt_toolkit.input.Input` instance.
"""
def __init__(
self,
title: AnyFormattedText = None,
formatters: Sequence[Formatter] | None = None,
bottom_toolbar: AnyFormattedText = None,
style: BaseStyle | None = None,
key_bindings: KeyBindings | None = None,
cancel_callback: Callable[[], None] | None = None,
file: TextIO | None = None,
color_depth: ColorDepth | None = None,
output: Output | None = None,
input: Input | None = None,
) -> None:
self.title = title
self.formatters = formatters or create_default_formatters()
self.bottom_toolbar = bottom_toolbar
self.counters: list[ProgressBarCounter[object]] = []
self.style = style
self.key_bindings = key_bindings
self.cancel_callback = cancel_callback
# If no `cancel_callback` was given, and we're creating the progress
# bar from the main thread. Cancel by sending a `KeyboardInterrupt` to
# the main thread.
if self.cancel_callback is None and in_main_thread():
def keyboard_interrupt_to_main_thread() -> None:
os.kill(os.getpid(), signal.SIGINT)
self.cancel_callback = keyboard_interrupt_to_main_thread
# Note that we use __stderr__ as default error output, because that
# works best with `patch_stdout`.
self.color_depth = color_depth
self.output = output or get_app_session().output
self.input = input or get_app_session().input
self._thread: threading.Thread | None = None
self._has_sigwinch = False
self._app_started = threading.Event()
def __enter__(self) -> ProgressBar:
# Create UI Application.
title_toolbar = ConditionalContainer(
Window(
FormattedTextControl(lambda: self.title),
height=1,
style="class:progressbar,title",
),
filter=Condition(lambda: self.title is not None),
)
bottom_toolbar = ConditionalContainer(
Window(
FormattedTextControl(
lambda: self.bottom_toolbar, style="class:bottom-toolbar.text"
),
style="class:bottom-toolbar",
height=1,
),
filter=~is_done
& renderer_height_is_known
& Condition(lambda: self.bottom_toolbar is not None),
)
def width_for_formatter(formatter: Formatter) -> AnyDimension:
# Needs to be passed as callable (partial) to the 'width'
# parameter, because we want to call it on every resize.
return formatter.get_width(progress_bar=self)
progress_controls = [
Window(
content=_ProgressControl(self, f, self.cancel_callback),
width=functools.partial(width_for_formatter, f),
)
for f in self.formatters
]
self.app: Application[None] = Application(
min_redraw_interval=0.05,
layout=Layout(
HSplit(
[
title_toolbar,
VSplit(
progress_controls,
height=lambda: D(
preferred=len(self.counters), max=len(self.counters)
),
),
Window(),
bottom_toolbar,
]
)
),
style=self.style,
key_bindings=self.key_bindings,
refresh_interval=0.3,
color_depth=self.color_depth,
output=self.output,
input=self.input,
)
# Run application in different thread.
def run() -> None:
try:
self.app.run(pre_run=self._app_started.set)
except BaseException as e:
traceback.print_exc()
print(e)
ctx: contextvars.Context = contextvars.copy_context()
self._thread = threading.Thread(target=ctx.run, args=(run,))
self._thread.start()
return self
def __exit__(self, *a: object) -> None:
# Wait for the app to be started. Make sure we don't quit earlier,
# otherwise `self.app.exit` won't terminate the app because
# `self.app.future` has not yet been set.
self._app_started.wait()
# Quit UI application.
if self.app.is_running and self.app.loop is not None:
self.app.loop.call_soon_threadsafe(self.app.exit)
if self._thread is not None:
self._thread.join()
def __call__(
self,
data: Iterable[_T] | None = None,
label: AnyFormattedText = "",
remove_when_done: bool = False,
total: int | None = None,
) -> ProgressBarCounter[_T]:
"""
Start a new counter.
:param label: Title text or description for this progress. (This can be
formatted text as well).
:param remove_when_done: When `True`, hide this progress bar.
:param total: Specify the maximum value if it can't be calculated by
calling ``len``.
"""
counter = ProgressBarCounter(
self, data, label=label, remove_when_done=remove_when_done, total=total
)
self.counters.append(counter)
return counter
def invalidate(self) -> None:
self.app.invalidate()
class _ProgressControl(UIControl):
"""
User control for the progress bar.
"""
def __init__(
self,
progress_bar: ProgressBar,
formatter: Formatter,
cancel_callback: Callable[[], None] | None,
) -> None:
self.progress_bar = progress_bar
self.formatter = formatter
self._key_bindings = create_key_bindings(cancel_callback)
def create_content(self, width: int, height: int) -> UIContent:
items: list[StyleAndTextTuples] = []
for pr in self.progress_bar.counters:
try:
text = self.formatter.format(self.progress_bar, pr, width)
except BaseException:
traceback.print_exc()
text = "ERROR"
items.append(to_formatted_text(text))
def get_line(i: int) -> StyleAndTextTuples:
return items[i]
return UIContent(get_line=get_line, line_count=len(items), show_cursor=False)
def is_focusable(self) -> bool:
return True # Make sure that the key bindings work.
def get_key_bindings(self) -> KeyBindings:
return self._key_bindings
_CounterItem = TypeVar("_CounterItem", covariant=True)
class ProgressBarCounter(Generic[_CounterItem]):
"""
An individual counter (A progress bar can have multiple counters).
"""
def __init__(
self,
progress_bar: ProgressBar,
data: Iterable[_CounterItem] | None = None,
label: AnyFormattedText = "",
remove_when_done: bool = False,
total: int | None = None,
) -> None:
self.start_time = datetime.datetime.now()
self.stop_time: datetime.datetime | None = None
self.progress_bar = progress_bar
self.data = data
self.items_completed = 0
self.label = label
self.remove_when_done = remove_when_done
self._done = False
self.total: int | None
if total is None:
try:
self.total = len(cast(Sized, data))
except TypeError:
self.total = None # We don't know the total length.
else:
self.total = total
def __iter__(self) -> Iterator[_CounterItem]:
if self.data is not None:
try:
for item in self.data:
yield item
self.item_completed()
# Only done if we iterate to the very end.
self.done = True
finally:
# Ensure counter has stopped even if we did not iterate to the
# end (e.g. break or exceptions).
self.stopped = True
else:
raise NotImplementedError("No data defined to iterate over.")
def item_completed(self) -> None:
"""
Start handling the next item.
(Can be called manually in case we don't have a collection to loop through.)
"""
self.items_completed += 1
self.progress_bar.invalidate()
@property
def done(self) -> bool:
"""Whether a counter has been completed.
Done counter have been stopped (see stopped) and removed depending on
remove_when_done value.
Contrast this with stopped. A stopped counter may be terminated before
100% completion. A done counter has reached its 100% completion.
"""
return self._done
@done.setter
def done(self, value: bool) -> None:
self._done = value
self.stopped = value
if value and self.remove_when_done:
self.progress_bar.counters.remove(self)
@property
def stopped(self) -> bool:
"""Whether a counter has been stopped.
Stopped counters no longer have increasing time_elapsed. This distinction is
also used to prevent the Bar formatter with unknown totals from continuing to run.
A stopped counter (but not done) can be used to signal that a given counter has
encountered an error but allows other counters to continue
(e.g. download X of Y failed). Given how only done counters are removed
(see remove_when_done) this can help aggregate failures from a large number of
successes.
Contrast this with done. A done counter has reached its 100% completion.
A stopped counter may be terminated before 100% completion.
"""
return self.stop_time is not None
@stopped.setter
def stopped(self, value: bool) -> None:
if value:
# This counter has not already been stopped.
if not self.stop_time:
self.stop_time = datetime.datetime.now()
else:
# Clearing any previously set stop_time.
self.stop_time = None
@property
def percentage(self) -> float:
if self.total is None:
return 0
else:
return self.items_completed * 100 / max(self.total, 1)
@property
def time_elapsed(self) -> datetime.timedelta:
"""
Return how much time has been elapsed since the start.
"""
if self.stop_time is None:
return datetime.datetime.now() - self.start_time
else:
return self.stop_time - self.start_time
@property
def time_left(self) -> datetime.timedelta | None:
"""
Timedelta representing the time left.
"""
if self.total is None or not self.percentage:
return None
elif self.done or self.stopped:
return datetime.timedelta(0)
else:
return self.time_elapsed * (100 - self.percentage) / self.percentage