mirror of https://github.com/JoshKarpel/spiel
parent
e9eab42a2c
commit
9277169e20
File diff suppressed because it is too large
Load Diff
@ -1,8 +1,4 @@
|
||||
from spiel.constants import __version__
|
||||
from spiel.deck import Deck
|
||||
from spiel.example import Example, example_panels
|
||||
from spiel.image import Image
|
||||
from spiel.options import Options
|
||||
from spiel.repls import repl
|
||||
from spiel.slide import Slide
|
||||
from spiel.triggers import Triggers
|
||||
|
@ -1,4 +1,3 @@
|
||||
from spiel.constants import PACKAGE_NAME
|
||||
from spiel.main import app
|
||||
from spiel.cli import cli
|
||||
|
||||
app(prog_name=PACKAGE_NAME)
|
||||
cli()
|
||||
|
@ -0,0 +1,187 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import code
|
||||
import datetime
|
||||
import importlib.util
|
||||
import sys
|
||||
from asyncio import wait
|
||||
from contextlib import contextmanager, redirect_stderr, redirect_stdout
|
||||
from functools import cached_property, partial
|
||||
from pathlib import Path
|
||||
from time import monotonic
|
||||
from typing import Callable, Iterator
|
||||
|
||||
from rich.style import Style
|
||||
from rich.text import Text
|
||||
from textual import log
|
||||
from textual.app import App
|
||||
from textual.binding import Binding
|
||||
from textual.events import Resize
|
||||
from textual.reactive import reactive
|
||||
from watchfiles import awatch
|
||||
|
||||
from spiel.constants import DECK, RELOAD_MESSAGE_TIME_FORMAT
|
||||
from spiel.deck import Deck
|
||||
from spiel.exceptions import NoDeckFound
|
||||
from spiel.screens.deck import DeckScreen
|
||||
from spiel.screens.help import HelpScreen
|
||||
from spiel.screens.slide import SlideScreen
|
||||
from spiel.triggers import Triggers
|
||||
from spiel.utils import clamp
|
||||
from spiel.widgets.slide import SlideWidget
|
||||
|
||||
|
||||
def load_deck(path: Path) -> Deck:
|
||||
module_name = "__deck"
|
||||
spec = importlib.util.spec_from_file_location(module_name, path)
|
||||
|
||||
if spec is None:
|
||||
raise NoDeckFound(f"{path.resolve()} does not appear to be an importable Python module.")
|
||||
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = module
|
||||
|
||||
loader = spec.loader
|
||||
assert loader is not None
|
||||
loader.exec_module(module)
|
||||
|
||||
try:
|
||||
deck = getattr(module, DECK)
|
||||
except AttributeError:
|
||||
raise NoDeckFound(f"The module at {path} does not have an attribute named {DECK}.")
|
||||
|
||||
if not isinstance(deck, Deck):
|
||||
raise NoDeckFound(
|
||||
f"The module at {path} has an attribute named {DECK}, but it is a {type(deck).__name__}, not a {Deck.__name__}."
|
||||
)
|
||||
|
||||
return deck
|
||||
|
||||
|
||||
class SpielApp(App[None]):
|
||||
CSS_PATH = "spiel.css"
|
||||
BINDINGS = [
|
||||
Binding("d", "switch_screen('deck')", "Go to the Deck view."),
|
||||
Binding("question_mark", "push_screen('help')", "Go to the Help view."),
|
||||
Binding("i", "repl", "Switch to the REPL."),
|
||||
]
|
||||
SCREENS = {"slide": SlideScreen(), "deck": DeckScreen(), "help": HelpScreen()}
|
||||
|
||||
deck = reactive(Deck(name="New Deck"))
|
||||
current_slide_idx = reactive(0)
|
||||
message = reactive(Text(""))
|
||||
|
||||
def __init__(self, deck_path: Path, watch_path: Path):
|
||||
super().__init__()
|
||||
|
||||
self.deck_path = deck_path
|
||||
self.watch_path = watch_path
|
||||
|
||||
async def on_mount(self) -> None:
|
||||
self.deck = load_deck(self.deck_path)
|
||||
self.reloader = asyncio.create_task(self.reload())
|
||||
|
||||
await self.push_screen("slide")
|
||||
|
||||
async def reload(self) -> None:
|
||||
log(f"Watching {self.watch_path} for changes")
|
||||
async for changes in awatch(self.watch_path):
|
||||
change_msg = "\n ".join([""] + [f"{k.raw_str()}: {v}" for k, v in changes])
|
||||
log(f"Reloading deck from {self.deck_path} due to detected file changes:{change_msg}")
|
||||
try:
|
||||
self.deck = load_deck(self.deck_path)
|
||||
self.current_slide_idx = clamp(self.current_slide_idx, 0, len(self.deck))
|
||||
self.set_message_temporarily(
|
||||
Text(
|
||||
f"Reloaded deck at {datetime.datetime.now().strftime(RELOAD_MESSAGE_TIME_FORMAT)}",
|
||||
style=Style(dim=True),
|
||||
),
|
||||
delay=10,
|
||||
)
|
||||
except Exception as e:
|
||||
self.set_message_temporarily(
|
||||
Text(
|
||||
f"Failed to reload deck at {datetime.datetime.now().strftime(RELOAD_MESSAGE_TIME_FORMAT)} due to: {e}",
|
||||
style=Style(color="red"),
|
||||
),
|
||||
delay=10,
|
||||
)
|
||||
|
||||
def on_resize(self, event: Resize) -> None:
|
||||
self.set_message_temporarily(
|
||||
message=Text(f"Screen resized to {event.size}", style=Style(dim=True)), delay=2
|
||||
)
|
||||
|
||||
def set_message_temporarily(self, message: Text, delay: float) -> None:
|
||||
self.message = message
|
||||
|
||||
def clear() -> None:
|
||||
if self.message is message:
|
||||
self.message = Text("")
|
||||
|
||||
self.set_timer(delay, clear)
|
||||
|
||||
def action_next_slide(self) -> None:
|
||||
self.current_slide_idx = clamp(self.current_slide_idx + 1, 0, len(self.deck) - 1)
|
||||
|
||||
def action_prev_slide(self) -> None:
|
||||
self.current_slide_idx = clamp(self.current_slide_idx - 1, 0, len(self.deck) - 1)
|
||||
|
||||
def action_next_row(self) -> None:
|
||||
self.current_slide_idx = clamp(
|
||||
self.current_slide_idx + self.deck_grid_width, 0, len(self.deck) - 1
|
||||
)
|
||||
|
||||
def action_prev_row(self) -> None:
|
||||
self.current_slide_idx = clamp(
|
||||
self.current_slide_idx - self.deck_grid_width, 0, len(self.deck) - 1
|
||||
)
|
||||
|
||||
def watch_current_slide_idx(self, new_current_slide_idx: int) -> None:
|
||||
self.query_one(SlideWidget).triggers = Triggers.new()
|
||||
|
||||
def action_trigger(self) -> None:
|
||||
now = monotonic()
|
||||
slide_widget = self.query_one(SlideWidget)
|
||||
slide_widget.triggers = Triggers(now=now, times=(*slide_widget.triggers.times, now))
|
||||
|
||||
def action_reset_trigger(self) -> None:
|
||||
slide_widget = self.query_one(SlideWidget)
|
||||
slide_widget.triggers = Triggers.new()
|
||||
|
||||
@cached_property
|
||||
def repl(self) -> Callable[[], None]:
|
||||
# Lazily enable readline support
|
||||
import readline # nopycln: import
|
||||
|
||||
self.console.clear() # clear the console the first time we go into the repl
|
||||
sys.stdout.flush()
|
||||
|
||||
repl = code.InteractiveConsole()
|
||||
return partial(repl.interact, banner="", exitmsg="")
|
||||
|
||||
def action_repl(self) -> None:
|
||||
with self.suspend():
|
||||
self.repl()
|
||||
|
||||
async def action_quit(self) -> None:
|
||||
self.reloader.cancel()
|
||||
await wait([self.reloader], timeout=1)
|
||||
|
||||
await super().action_quit()
|
||||
|
||||
@contextmanager
|
||||
def suspend(self) -> Iterator[None]:
|
||||
driver = self._driver
|
||||
|
||||
if driver is not None:
|
||||
driver.stop_application_mode()
|
||||
driver.exit_event.clear() # type: ignore[attr-defined]
|
||||
with redirect_stdout(sys.__stdout__), redirect_stderr(sys.__stderr__):
|
||||
yield
|
||||
driver.start_application_mode()
|
||||
|
||||
@property
|
||||
def deck_grid_width(self) -> int:
|
||||
return max(self.console.size.width // 35, 1)
|
@ -1,15 +1,18 @@
|
||||
import os
|
||||
import sys
|
||||
from importlib import metadata
|
||||
from pathlib import Path
|
||||
|
||||
PACKAGE_NAME = "spiel"
|
||||
__version__ = metadata.version(PACKAGE_NAME)
|
||||
__rich_version__ = metadata.version("rich")
|
||||
__textual_version__ = metadata.version("textual")
|
||||
__python_version__ = ".".join(map(str, sys.version_info))
|
||||
|
||||
DECK = "deck"
|
||||
OPTIONS = "options"
|
||||
|
||||
TARGET_RPS = 30
|
||||
PACKAGE_DIR = Path(__file__).resolve().parent
|
||||
DEMO_DIR = PACKAGE_DIR / "demo"
|
||||
DEMO_FILE = PACKAGE_DIR / "demo" / "demo.py"
|
||||
|
||||
EDITOR = os.getenv("EDITOR", "not set")
|
||||
FOOTER_TIME_FORMAT = "%Y-%m-%d %I:%M %p"
|
||||
RELOAD_MESSAGE_TIME_FORMAT = "%I:%M:%S %p"
|
||||
|
@ -1,75 +1,40 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import dis
|
||||
import inspect
|
||||
import sys
|
||||
from collections.abc import Callable, Collection, Iterator, Sequence
|
||||
from dataclasses import dataclass, field
|
||||
from textwrap import dedent
|
||||
from typing import Callable, Iterator, Mapping
|
||||
|
||||
from spiel.example import Example
|
||||
from spiel.presentable import Presentable
|
||||
from spiel.slide import MakeRenderable, Slide
|
||||
from spiel.slide import Content, Slide
|
||||
|
||||
|
||||
@dataclass
|
||||
class Deck(Collection[Presentable]):
|
||||
class Deck:
|
||||
name: str
|
||||
slides: list[Presentable] = field(default_factory=list)
|
||||
|
||||
def __getitem__(self, idx: int) -> Presentable:
|
||||
return self.slides[idx]
|
||||
slides: list[Slide] = field(default_factory=list)
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.slides)
|
||||
|
||||
def __iter__(self) -> Iterator[Presentable]:
|
||||
return iter(self.slides)
|
||||
|
||||
def __contains__(self, obj: object) -> bool:
|
||||
return obj in self.slides
|
||||
def __getitem__(self, item: int) -> Slide:
|
||||
return self.slides[item]
|
||||
|
||||
def add_slides(self, *slides: Presentable) -> Deck:
|
||||
self.slides.extend(slides)
|
||||
return self
|
||||
def __iter__(self) -> Iterator[Slide]:
|
||||
yield from self.slides
|
||||
|
||||
def slide(
|
||||
self,
|
||||
title: str = "",
|
||||
) -> Callable[[MakeRenderable], Slide]:
|
||||
def slideify(content: MakeRenderable) -> Slide:
|
||||
bindings: Mapping[str, Callable[[], None]] | None = None,
|
||||
) -> Callable[[Content], Slide]:
|
||||
def slideify(content: Content) -> Slide:
|
||||
slide = Slide(
|
||||
title=title,
|
||||
content=content,
|
||||
bindings=bindings or {},
|
||||
)
|
||||
self.add_slides(slide)
|
||||
return slide
|
||||
|
||||
return slideify
|
||||
|
||||
def example(
|
||||
self,
|
||||
title: str = "",
|
||||
command: Sequence[str] = (sys.executable,),
|
||||
name: str = "example.py",
|
||||
language: str = "python",
|
||||
) -> Callable[[Callable[[], None]], Example]:
|
||||
def exampleify(example: Callable[[], None]) -> Example:
|
||||
ex = Example(
|
||||
title=title,
|
||||
source=get_function_body(example),
|
||||
command=command,
|
||||
name=name,
|
||||
language=language,
|
||||
)
|
||||
self.add_slides(ex)
|
||||
return ex
|
||||
|
||||
return exampleify
|
||||
|
||||
|
||||
def get_function_body(function: Callable[..., object]) -> str:
|
||||
lines, line_of_def_start = inspect.getsourcelines(function)
|
||||
line_of_first_instruction = list(dis.Bytecode(function))[0].starts_line or line_of_def_start
|
||||
offset = line_of_first_instruction - line_of_def_start
|
||||
return dedent("".join(lines[offset:]))
|
||||
def add_slides(self, *slides: Slide) -> None:
|
||||
self.slides.extend(slides)
|
||||
|
@ -1,114 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import shlex
|
||||
import sys
|
||||
import tempfile
|
||||
from collections.abc import Callable, Sequence
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from subprocess import PIPE, STDOUT, run
|
||||
|
||||
from rich.align import Align
|
||||
from rich.console import ConsoleRenderable
|
||||
from rich.layout import Layout
|
||||
from rich.panel import Panel
|
||||
from rich.syntax import Syntax
|
||||
from rich.text import Text
|
||||
|
||||
from .presentable import Presentable
|
||||
from .triggers import Triggers
|
||||
|
||||
|
||||
@dataclass
|
||||
class CachedExample:
|
||||
trigger_number: int
|
||||
input: str
|
||||
output: str | None
|
||||
|
||||
|
||||
def example_panels(example: Example) -> ConsoleRenderable:
|
||||
root = Layout()
|
||||
root.split_column(
|
||||
Layout(
|
||||
Align.center(
|
||||
Panel(
|
||||
example.input,
|
||||
title=example.name,
|
||||
title_align="left",
|
||||
expand=False,
|
||||
)
|
||||
)
|
||||
),
|
||||
Layout(
|
||||
Align.center(
|
||||
Panel(
|
||||
example.output,
|
||||
title=f"$ {example.display_command}",
|
||||
title_align="left",
|
||||
expand=False,
|
||||
)
|
||||
if example.output is not None
|
||||
else Text(" ")
|
||||
)
|
||||
),
|
||||
)
|
||||
return root
|
||||
|
||||
|
||||
ExampleLayout = Callable[["Example"], ConsoleRenderable]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Example(Presentable):
|
||||
source: str = ""
|
||||
command: Sequence[str] = (sys.executable,)
|
||||
name: str = "example.py"
|
||||
language: str = "python"
|
||||
_layout: ExampleLayout = example_panels
|
||||
_cache: CachedExample | None = None
|
||||
|
||||
def layout(self, function: ExampleLayout) -> ExampleLayout:
|
||||
self._layout = function
|
||||
return function
|
||||
|
||||
@property
|
||||
def display_command(self) -> str:
|
||||
return shlex.join([Path(self.command[0]).stem, *self.command[1:], self.name])
|
||||
|
||||
def execute(self) -> str:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
dir = Path(tmpdir)
|
||||
file = dir / self.name
|
||||
file.write_text(self.source)
|
||||
result = run([*self.command, file], stdout=PIPE, stderr=STDOUT, text=True)
|
||||
return result.stdout
|
||||
|
||||
@property
|
||||
def input(self) -> Syntax:
|
||||
input = (self._cache.input or "") if self._cache is not None else ""
|
||||
return Syntax(
|
||||
input.strip(),
|
||||
lexer=self.language,
|
||||
code_width=max(len(line) for line in input.splitlines()),
|
||||
)
|
||||
|
||||
@property
|
||||
def output(self) -> Text | None:
|
||||
return (
|
||||
Text(self._cache.output)
|
||||
if (self._cache is not None and self._cache.output is not None)
|
||||
else None
|
||||
)
|
||||
|
||||
def clear_cache(self) -> None:
|
||||
self._cache = None
|
||||
|
||||
def render(self, triggers: Triggers) -> ConsoleRenderable:
|
||||
if self._cache is None:
|
||||
self._cache = CachedExample(len(triggers), self.source, None)
|
||||
elif self._cache.trigger_number != len(triggers):
|
||||
self._cache = CachedExample(len(triggers), self.source, self.execute())
|
||||
|
||||
return self._layout(
|
||||
self, **self.get_render_kwargs(function=self._layout, triggers=triggers)
|
||||
)
|
@ -1,81 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from pendulum import now
|
||||
from rich.console import ConsoleRenderable
|
||||
from rich.style import Style
|
||||
from rich.table import Column, Table
|
||||
from rich.text import Text
|
||||
|
||||
from spiel.modes import Mode
|
||||
from spiel.rps import RPSCounter
|
||||
from spiel.state import State
|
||||
from spiel.utils import drop_nones, filter_join
|
||||
|
||||
|
||||
@dataclass
|
||||
class Footer:
|
||||
state: State
|
||||
rps_counter: RPSCounter
|
||||
|
||||
@property
|
||||
def longest_slide_number_length(self) -> int:
|
||||
num_slides = len(self.state.deck)
|
||||
return len(str(num_slides))
|
||||
|
||||
def __rich__(self) -> ConsoleRenderable:
|
||||
grid = Table.grid(
|
||||
*drop_nones(
|
||||
Column(
|
||||
style=Style(dim=True),
|
||||
justify="left",
|
||||
),
|
||||
Column(
|
||||
style=Style(bold=True),
|
||||
justify="center",
|
||||
),
|
||||
Column(
|
||||
style=Style(dim=True),
|
||||
justify="right",
|
||||
)
|
||||
if self.state.options.profiling
|
||||
else None,
|
||||
Column(
|
||||
style=Style(dim=True),
|
||||
justify="right",
|
||||
),
|
||||
Column(
|
||||
style=Style(dim=True),
|
||||
justify="right",
|
||||
),
|
||||
),
|
||||
expand=True,
|
||||
padding=1,
|
||||
)
|
||||
grid.add_row(
|
||||
*drop_nones(
|
||||
Text(
|
||||
filter_join(
|
||||
" | ",
|
||||
[
|
||||
self.state.deck.name,
|
||||
self.state.current_slide.title
|
||||
if self.state.mode is Mode.SLIDE
|
||||
else None,
|
||||
],
|
||||
)
|
||||
),
|
||||
self.state.message,
|
||||
Text(
|
||||
f"Render Time: {self.rps_counter.last_elapsed_render_time() * 1e3:>3.3f} ms | {self.rps_counter.renders_per_second():.2f} RPS"
|
||||
)
|
||||
if self.state.options.profiling
|
||||
else None,
|
||||
now().format(self.state.options.footer_time_format),
|
||||
Text(
|
||||
f"[{self.state.current_slide_idx + 1:>0{self.longest_slide_number_length}d} / {len(self.state.deck)}]"
|
||||
)
|
||||
if self.state.mode is not Mode.HELP
|
||||
else Text(Mode.HELP.value, style=Style(italic=True)),
|
||||
)
|
||||
)
|
||||
return grid
|
@ -1,96 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from click._termui_impl import Editor
|
||||
from rich.align import Align
|
||||
from rich.console import Console, ConsoleRenderable, Group
|
||||
from rich.padding import Padding
|
||||
from rich.style import Style
|
||||
from rich.table import Column, Table
|
||||
from rich.text import Text
|
||||
|
||||
from spiel.constants import PACKAGE_NAME, __python_version__, __rich_version__, __version__
|
||||
from spiel.input import INPUT_HANDLER_HELP, SpecialCharacters
|
||||
from spiel.modes import Mode
|
||||
from spiel.state import State
|
||||
|
||||
|
||||
@dataclass
|
||||
class Help:
|
||||
state: State
|
||||
|
||||
def __rich__(self) -> ConsoleRenderable:
|
||||
action_table = Table(
|
||||
Column(
|
||||
"Action",
|
||||
style=Style(bold=True),
|
||||
),
|
||||
Column(
|
||||
"Keys",
|
||||
style=Style(bold=True),
|
||||
justify="center",
|
||||
),
|
||||
Column(
|
||||
"Modes",
|
||||
justify="center",
|
||||
),
|
||||
Column(
|
||||
"Description",
|
||||
),
|
||||
show_lines=True,
|
||||
)
|
||||
|
||||
for info in INPUT_HANDLER_HELP:
|
||||
action_table.add_row(
|
||||
Text(info.name),
|
||||
Text(" ").join(
|
||||
Text(c.value if isinstance(c, SpecialCharacters) else c)
|
||||
for c in info.characters
|
||||
),
|
||||
Text(", ").join(Text(mode.value) for mode in info.modes)
|
||||
if len(info.modes) != len(list(Mode))
|
||||
else Text("any", style=Style(italic=True)),
|
||||
Text.from_markup(info.help),
|
||||
)
|
||||
|
||||
return Padding(
|
||||
Group(
|
||||
Align.center(action_table),
|
||||
Align.center(version_details(self.state.console)),
|
||||
),
|
||||
pad=(0, 1),
|
||||
)
|
||||
|
||||
|
||||
def version_details(console: Console) -> ConsoleRenderable:
|
||||
table = Table(
|
||||
Column(justify="right"),
|
||||
Column(justify="left"),
|
||||
show_header=False,
|
||||
box=None,
|
||||
)
|
||||
|
||||
table.add_row(f"{PACKAGE_NAME.capitalize()} Version", __version__)
|
||||
table.add_row("Rich Version", __rich_version__)
|
||||
table.add_row("Python Version", __python_version__, end_section=True)
|
||||
|
||||
table.add_row(
|
||||
"Color System",
|
||||
Text(
|
||||
console.color_system or "unknown",
|
||||
style=Style(color="red" if console.color_system != "truecolor" else "green"),
|
||||
),
|
||||
)
|
||||
table.add_row(
|
||||
"Console Dimensions",
|
||||
Text(f"{console.width} cells wide, {console.height} cells tall"),
|
||||
end_section=True,
|
||||
)
|
||||
|
||||
editor = Editor().get_editor()
|
||||
table.add_row(
|
||||
"Editor",
|
||||
Text(editor),
|
||||
end_section=True,
|
||||
)
|
||||
|
||||
return table
|
@ -1,424 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import inspect
|
||||
import string
|
||||
import sys
|
||||
import termios
|
||||
from collections.abc import Callable, Iterable, Iterator, MutableMapping
|
||||
from contextlib import contextmanager
|
||||
from copy import deepcopy
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum, unique
|
||||
from io import UnsupportedOperation
|
||||
from itertools import product
|
||||
from pathlib import Path
|
||||
from textwrap import dedent
|
||||
from typing import Any, NoReturn, TextIO
|
||||
|
||||
import typer
|
||||
from rich.control import Control
|
||||
from rich.text import Text
|
||||
from tomli import TOMLDecodeError
|
||||
from typer import Exit
|
||||
|
||||
from spiel.constants import EDITOR, PACKAGE_NAME
|
||||
from spiel.example import Example
|
||||
from spiel.exceptions import DuplicateInputHandler, InvalidOptionValue
|
||||
from spiel.modes import Mode
|
||||
from spiel.options import Options
|
||||
from spiel.repls import REPLS
|
||||
from spiel.state import State
|
||||
|
||||
LFLAG = 3
|
||||
CC = 6
|
||||
|
||||
|
||||
try:
|
||||
ORIGINAL_TCGETATTR: list[Any] | None = termios.tcgetattr(sys.stdin)
|
||||
except (UnsupportedOperation, termios.error):
|
||||
ORIGINAL_TCGETATTR = None
|
||||
|
||||
|
||||
@contextmanager
|
||||
def no_echo() -> Iterator[None]:
|
||||
try:
|
||||
start_no_echo(sys.stdin)
|
||||
yield
|
||||
finally:
|
||||
reset_tty(sys.stdin)
|
||||
|
||||
|
||||
def start_no_echo(stream: TextIO) -> None:
|
||||
if ORIGINAL_TCGETATTR is None:
|
||||
return
|
||||
|
||||
mode = deepcopy(ORIGINAL_TCGETATTR)
|
||||
|
||||
mode[LFLAG] = mode[LFLAG] & ~(termios.ECHO | termios.ICANON)
|
||||
mode[CC][termios.VMIN] = 1
|
||||
mode[CC][termios.VTIME] = 0
|
||||
|
||||
termios.tcsetattr(stream.fileno(), termios.TCSADRAIN, mode)
|
||||
|
||||
|
||||
def reset_tty(stream: TextIO) -> None:
|
||||
if ORIGINAL_TCGETATTR is None:
|
||||
return
|
||||
|
||||
termios.tcsetattr(stream.fileno(), termios.TCSADRAIN, ORIGINAL_TCGETATTR)
|
||||
|
||||
|
||||
@unique
|
||||
class SpecialCharacters(Enum):
|
||||
Up = "↑"
|
||||
Down = "↓"
|
||||
Right = "→"
|
||||
Left = "←"
|
||||
CtrlUp = "ctrl-up"
|
||||
CtrlDown = "ctrl-down"
|
||||
CtrlRight = "ctrl-right"
|
||||
CtrlLeft = "ctrl-left"
|
||||
CtrlK = "ctrl-k"
|
||||
CtrlC = "ctrl-c"
|
||||
ShiftUp = "shift-up"
|
||||
ShiftDown = "shift-down"
|
||||
ShiftRight = "shift-right"
|
||||
ShiftLeft = "shift-left"
|
||||
CtrlShiftUp = "ctrl-shift-up"
|
||||
CtrlShiftDown = "ctrl-shift-down"
|
||||
CtrlShiftRight = "ctrl-shift-right"
|
||||
CtrlShiftLeft = "ctrl-shift-left"
|
||||
Backspace = "backspace"
|
||||
CtrlSpace = "ctrl-space"
|
||||
Enter = "enter"
|
||||
|
||||
|
||||
SPECIAL_CHARACTERS = {
|
||||
"\x1b[A": SpecialCharacters.Up,
|
||||
"\x1b[B": SpecialCharacters.Down,
|
||||
"\x1b[C": SpecialCharacters.Right,
|
||||
"\x1b[D": SpecialCharacters.Left,
|
||||
"\x0b": SpecialCharacters.CtrlK,
|
||||
"\x1b[1;5A": SpecialCharacters.CtrlUp,
|
||||
"\x1b[1;5B": SpecialCharacters.CtrlDown,
|
||||
"\x1b[1;5C": SpecialCharacters.CtrlRight,
|
||||
"\x1b[1;5D": SpecialCharacters.CtrlLeft,
|
||||
"\x1b[1;2A": SpecialCharacters.ShiftUp,
|
||||
"\x1b[1;2B": SpecialCharacters.ShiftDown,
|
||||
"\x1b[1;2C": SpecialCharacters.ShiftRight,
|
||||
"\x1b[1;2D": SpecialCharacters.ShiftLeft,
|
||||
"\x1b[1;6A": SpecialCharacters.CtrlShiftUp,
|
||||
"\x1b[1;6B": SpecialCharacters.CtrlShiftDown,
|
||||
"\x1b[1;6C": SpecialCharacters.CtrlShiftRight,
|
||||
"\x1b[1;6D": SpecialCharacters.CtrlShiftLeft,
|
||||
"\x7f": SpecialCharacters.Backspace,
|
||||
"\x00": SpecialCharacters.CtrlSpace,
|
||||
"\n": SpecialCharacters.Enter,
|
||||
}
|
||||
|
||||
|
||||
def get_character(stream: TextIO) -> str | SpecialCharacters:
|
||||
result = stream.read(1)
|
||||
|
||||
if result == "": # this happens when stdin gets closed; equivalent to a quit
|
||||
raise Exit(code=0)
|
||||
|
||||
if result[-1] == "\x1b":
|
||||
result += stream.read(2)
|
||||
|
||||
if len(result) != 1 and result[-1] == "1":
|
||||
result += stream.read(3)
|
||||
|
||||
return SPECIAL_CHARACTERS.get(result, result)
|
||||
|
||||
|
||||
Character = str | SpecialCharacters
|
||||
InputHandler = Callable[[State], NoReturn | None]
|
||||
InputHandlerKey = tuple[Character, Mode]
|
||||
InputHandlerDecorator = Callable[[InputHandler], InputHandler]
|
||||
InputHandlers = MutableMapping[InputHandlerKey, InputHandler]
|
||||
|
||||
INPUT_HANDLERS: InputHandlers = {} # type: ignore[assignment]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class InputHandlerHelpInfo:
|
||||
name: str
|
||||
help: str
|
||||
characters: tuple[Character, ...]
|
||||
modes: list[Mode]
|
||||
|
||||
|
||||
INPUT_HANDLER_HELP: list[InputHandlerHelpInfo] = []
|
||||
|
||||
|
||||
def handle_input(
|
||||
state: State,
|
||||
stream: TextIO,
|
||||
handlers: InputHandlers = INPUT_HANDLERS,
|
||||
) -> NoReturn | None:
|
||||
character = get_character(stream)
|
||||
|
||||
try:
|
||||
handler = handlers[(character, state.mode)]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
return handler(state)
|
||||
|
||||
|
||||
def normalize_help(help: str) -> str:
|
||||
return dedent(help).replace("\n", " ").strip()
|
||||
|
||||
|
||||
def input_handler(
|
||||
*characters: Character,
|
||||
modes: Iterable[Mode] | None = None,
|
||||
handlers: InputHandlers = INPUT_HANDLERS,
|
||||
name: str | None = None,
|
||||
help: str,
|
||||
) -> InputHandlerDecorator:
|
||||
target_modes = list(modes or list(Mode))
|
||||
|
||||
def registrar(func: InputHandler) -> InputHandler:
|
||||
for character, mode in product(characters, target_modes):
|
||||
key: InputHandlerKey = (character, mode)
|
||||
# Don't allow duplicate handlers to be registered inside this module,
|
||||
# but DO let end-users register them.
|
||||
if key in handlers and inspect.getmodule(func) == inspect.getmodule(input_handler):
|
||||
raise DuplicateInputHandler(
|
||||
f"{character} is already registered as an input handler for mode {mode}"
|
||||
)
|
||||
handlers[key] = func
|
||||
|
||||
INPUT_HANDLER_HELP.append(
|
||||
InputHandlerHelpInfo(
|
||||
name=name or " ".join(word.capitalize() for word in func.__name__.split("_")),
|
||||
help=normalize_help(help),
|
||||
characters=characters,
|
||||
modes=target_modes,
|
||||
)
|
||||
)
|
||||
|
||||
return func
|
||||
|
||||
return registrar
|
||||
|
||||
|
||||
NOT_HELP = [Mode.SLIDE, Mode.DECK]
|
||||
|
||||
|
||||
@input_handler(
|
||||
"h",
|
||||
help=f"Enter {Mode.HELP} mode.",
|
||||
)
|
||||
def help_mode(state: State) -> None:
|
||||
state.mode = Mode.HELP
|
||||
|
||||
|
||||
@input_handler(
|
||||
"s",
|
||||
help=f"Enter {Mode.SLIDE} mode.",
|
||||
)
|
||||
def slide_mode(state: State) -> None:
|
||||
state.mode = Mode.SLIDE
|
||||
|
||||
|
||||
@input_handler(
|
||||
"d",
|
||||
help=f"Enter {Mode.DECK} mode.",
|
||||
)
|
||||
def deck_mode(state: State) -> None:
|
||||
state.mode = Mode.DECK
|
||||
|
||||
|
||||
@input_handler(
|
||||
"p",
|
||||
help=f"Enter {Mode.OPTIONS} mode.",
|
||||
)
|
||||
def options_mode(state: State) -> None:
|
||||
state.mode = Mode.OPTIONS
|
||||
|
||||
|
||||
@input_handler(
|
||||
"e",
|
||||
modes=[Mode.OPTIONS],
|
||||
help=f"Open your $EDITOR ([bold]{EDITOR}[/bold]) to edit options (as TOML).",
|
||||
)
|
||||
def edit_options(state: State) -> None:
|
||||
with suspend_live(state):
|
||||
new_toml = state.options.as_toml()
|
||||
while True:
|
||||
new_toml = _clean_toml(
|
||||
typer.edit(text=new_toml, extension=".toml", require_save=False) or ""
|
||||
)
|
||||
try:
|
||||
state.options = Options.from_toml(new_toml)
|
||||
return
|
||||
except TOMLDecodeError as e:
|
||||
new_toml = f"{new_toml}\n\n# Parse Error: {e}\n"
|
||||
except InvalidOptionValue as e:
|
||||
new_toml = f"{new_toml}\n\n# Invalid Option Value: {e}\n"
|
||||
except Exception as e:
|
||||
new_toml = f"{new_toml}\n\n# Error: {e}\n"
|
||||
|
||||
|
||||
def _clean_toml(s: str) -> str:
|
||||
return "\n".join(line for line in s.splitlines() if (line and not line.startswith("#")))
|
||||
|
||||
|
||||
@input_handler(
|
||||
SpecialCharacters.Right,
|
||||
"f",
|
||||
modes=NOT_HELP,
|
||||
help="Move to the next slide.",
|
||||
)
|
||||
def next_slide(state: State) -> None:
|
||||
state.next_slide()
|
||||
|
||||
|
||||
@input_handler(
|
||||
SpecialCharacters.Left,
|
||||
"b",
|
||||
modes=NOT_HELP,
|
||||
help="Move to the previous slide.",
|
||||
)
|
||||
def previous_slide(state: State) -> None:
|
||||
state.previous_slide()
|
||||
|
||||
|
||||
@input_handler(
|
||||
SpecialCharacters.Up,
|
||||
modes=[Mode.DECK],
|
||||
help="Move to the previous deck grid row.",
|
||||
)
|
||||
def up_grid_row(state: State) -> None:
|
||||
state.previous_slide(move=state.deck_grid_width)
|
||||
|
||||
|
||||
@input_handler(
|
||||
SpecialCharacters.Down,
|
||||
modes=[Mode.DECK],
|
||||
help="Move to the next deck grid row.",
|
||||
)
|
||||
def down_grid_row(state: State) -> None:
|
||||
state.next_slide(move=state.deck_grid_width)
|
||||
|
||||
|
||||
@input_handler(
|
||||
"j",
|
||||
modes=NOT_HELP,
|
||||
help="""\
|
||||
Press the action key, then a slide number (e.g., [bold]17[/bold]), then press [bold]enter[/bold], to jump to that slide.
|
||||
If the slide number is unambiguous, the jump will happen without needing to press [bold]enter[/bold]
|
||||
(e.g., you enter [bold]3[/bold] and there are only [bold]8[/bold] slides).
|
||||
""",
|
||||
)
|
||||
def jump_to_slide(state: State) -> None:
|
||||
slide_number = ""
|
||||
|
||||
def display() -> None:
|
||||
state.set_message(Text(f"Jumping to slide {slide_number}..."))
|
||||
|
||||
def jump() -> None:
|
||||
state.clear_message()
|
||||
if slide_number == "":
|
||||
return
|
||||
state.jump_to_slide(int(slide_number) - 1)
|
||||
return
|
||||
|
||||
display()
|
||||
|
||||
while True:
|
||||
char = get_character(sys.stdin)
|
||||
|
||||
if char is SpecialCharacters.Backspace:
|
||||
slide_number = slide_number[:-1]
|
||||
elif char is SpecialCharacters.Enter:
|
||||
return jump()
|
||||
elif isinstance(char, SpecialCharacters):
|
||||
continue
|
||||
elif char in string.digits:
|
||||
slide_number += char
|
||||
|
||||
display()
|
||||
|
||||
if len(slide_number) == len(str(len(state.deck))):
|
||||
return jump()
|
||||
|
||||
|
||||
@input_handler(
|
||||
"t",
|
||||
modes=[Mode.SLIDE],
|
||||
help="Trigger the slide: marks the current time and make it available to the slide's content rendering function.",
|
||||
)
|
||||
def trigger(state: State) -> None:
|
||||
state.trigger()
|
||||
|
||||
|
||||
@input_handler(
|
||||
"r",
|
||||
modes=[Mode.SLIDE],
|
||||
help="Reset the trigger state to as if the slide just started being displayed.",
|
||||
)
|
||||
def reset_trigger(state: State) -> None:
|
||||
state.reset_trigger()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def suspend_live(state: State) -> Iterator[None]:
|
||||
live = state.console._live
|
||||
|
||||
if live is None:
|
||||
yield
|
||||
return
|
||||
|
||||
live.stop()
|
||||
yield
|
||||
live.start(refresh=True)
|
||||
|
||||
|
||||
@input_handler(
|
||||
"e",
|
||||
modes=[Mode.SLIDE],
|
||||
help=f"Open your $EDITOR ([bold]{EDITOR}[/bold]) on the source of an [bold]Example[/bold] slide. If the current slide is not an [bold]Example[/bold], do nothing.",
|
||||
)
|
||||
def edit_example(state: State) -> None:
|
||||
example = state.current_slide
|
||||
if isinstance(example, Example):
|
||||
with suspend_live(state):
|
||||
example.source = (
|
||||
typer.edit(
|
||||
text=example.source, extension=Path(example.name).suffix, require_save=False
|
||||
)
|
||||
or ""
|
||||
)
|
||||
example.clear_cache()
|
||||
|
||||
|
||||
@input_handler(
|
||||
"i",
|
||||
name="Start REPL",
|
||||
modes=NOT_HELP,
|
||||
help=f"Start an [link=https://ipython.readthedocs.io/en/stable/overview.html]IPython REPL[/link].",
|
||||
)
|
||||
def open_repl(state: State) -> None:
|
||||
with suspend_live(state):
|
||||
reset_tty(sys.stdin)
|
||||
state.console.print(Control.clear())
|
||||
state.console.print(Control.move_to(0, 0))
|
||||
|
||||
try:
|
||||
REPLS[state.options.repl]()
|
||||
finally:
|
||||
start_no_echo(sys.stdin)
|
||||
|
||||
|
||||
@input_handler(
|
||||
SpecialCharacters.CtrlK,
|
||||
SpecialCharacters.CtrlC,
|
||||
help=f"Exit {PACKAGE_NAME}.",
|
||||
)
|
||||
def exit(state: State) -> None:
|
||||
raise Exit(code=0)
|
@ -1,78 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import ContextManager, Type
|
||||
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.observers.polling import PollingObserver
|
||||
|
||||
from spiel.constants import DECK, OPTIONS
|
||||
from spiel.deck import Deck
|
||||
from spiel.exceptions import NoDeckFound
|
||||
from spiel.options import Options
|
||||
|
||||
|
||||
def load_deck_and_options(path: Path) -> tuple[Deck, Options]:
|
||||
module_name = "__deck"
|
||||
spec = importlib.util.spec_from_file_location(module_name, path)
|
||||
|
||||
if spec is None:
|
||||
raise FileNotFoundError(
|
||||
f"{path.resolve()} does not appear to be an importable Python module."
|
||||
)
|
||||
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = module
|
||||
|
||||
loader = spec.loader
|
||||
assert loader is not None
|
||||
loader.exec_module(module)
|
||||
|
||||
try:
|
||||
deck = getattr(module, DECK)
|
||||
except AttributeError:
|
||||
raise NoDeckFound(f"The module at {path} does not have an attribute named {DECK}.")
|
||||
|
||||
if not isinstance(deck, Deck):
|
||||
raise NoDeckFound(
|
||||
f"The module at {path} has an attribute named {DECK}, but it is a {type(deck).__name__}, not a {Deck.__name__}."
|
||||
)
|
||||
|
||||
options = getattr(module, OPTIONS, Options())
|
||||
|
||||
if not isinstance(options, Options):
|
||||
options = Options()
|
||||
|
||||
return deck, options
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeckWatcher(ContextManager["DeckWatcher"]):
|
||||
event_handler: FileSystemEventHandler
|
||||
path: Path
|
||||
poll: bool = False
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.observer = (PollingObserver if self.poll else Observer)(timeout=0.1)
|
||||
|
||||
def __enter__(self) -> DeckWatcher:
|
||||
self.observer.schedule(self.event_handler, str(self.path))
|
||||
self.observer.start()
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Type[BaseException] | None,
|
||||
exc_value: BaseException | None,
|
||||
traceback: TracebackType | None,
|
||||
) -> bool | None:
|
||||
self.observer.stop()
|
||||
self.observer.join()
|
||||
|
||||
return None
|
@ -1,9 +0,0 @@
|
||||
from enum import Enum, unique
|
||||
|
||||
|
||||
@unique
|
||||
class Mode(str, Enum):
|
||||
SLIDE = "slide"
|
||||
DECK = "deck"
|
||||
HELP = "help"
|
||||
OPTIONS = "options"
|
@ -1,68 +0,0 @@
|
||||
from collections.abc import Mapping
|
||||
from dataclasses import asdict, dataclass, fields
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import tomli
|
||||
import tomli_w
|
||||
from rich.align import Align
|
||||
from rich.console import ConsoleRenderable
|
||||
from rich.padding import Padding
|
||||
from rich.table import Column, Table
|
||||
|
||||
from spiel.constants import PACKAGE_NAME
|
||||
from spiel.exceptions import InvalidOptionValue
|
||||
from spiel.repls import REPLS
|
||||
|
||||
|
||||
@dataclass
|
||||
class Options:
|
||||
repl: str = "ipython"
|
||||
footer_time_format: str = "YYYY-MM-DD hh:mm A"
|
||||
profiling: bool = False
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.repl not in REPLS:
|
||||
raise InvalidOptionValue(f"repl must be one of: {set(REPLS.keys())}")
|
||||
|
||||
def as_dict(self) -> Mapping[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: Mapping[str, Any]) -> "Options":
|
||||
fields_by_name = {field.name: field for field in fields(cls)}
|
||||
only_valid = {k: fields_by_name[k].type(v) for k, v in d.items() if k in fields_by_name}
|
||||
return cls(**only_valid)
|
||||
|
||||
def as_toml(self) -> str:
|
||||
return tomli_w.dumps({PACKAGE_NAME: self.as_dict()})
|
||||
|
||||
@classmethod
|
||||
def from_toml(cls, t: str) -> "Options":
|
||||
return cls.from_dict(tomli.loads(t).get(PACKAGE_NAME, {}))
|
||||
|
||||
def save(self, path: Path) -> Path:
|
||||
path.write_text(self.as_toml())
|
||||
return path
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Path) -> "Options":
|
||||
return cls.from_toml(path.read_text())
|
||||
|
||||
def __rich__(self) -> ConsoleRenderable:
|
||||
table = Table(
|
||||
Column("Option"),
|
||||
Column("Type", justify="center"),
|
||||
Column("Value"),
|
||||
show_lines=True,
|
||||
)
|
||||
|
||||
fields_by_name = {field.name: field for field in fields(self)}
|
||||
|
||||
for key, value in self.as_dict().items():
|
||||
table.add_row(key, fields_by_name[key].type.__name__, str(value))
|
||||
|
||||
return Padding(
|
||||
Align.center(table),
|
||||
pad=(0, 1),
|
||||
)
|
@ -1,108 +0,0 @@
|
||||
import sys
|
||||
from itertools import islice
|
||||
from math import ceil
|
||||
from time import monotonic
|
||||
|
||||
from rich.console import ConsoleRenderable
|
||||
from rich.layout import Layout
|
||||
from rich.live import Live
|
||||
from rich.padding import Padding
|
||||
from rich.panel import Panel
|
||||
from rich.style import Style
|
||||
|
||||
from spiel.constants import TARGET_RPS
|
||||
from spiel.exceptions import UnknownModeError
|
||||
from spiel.footer import Footer
|
||||
from spiel.help import Help
|
||||
from spiel.input import handle_input, no_echo
|
||||
from spiel.modes import Mode
|
||||
from spiel.presentable import Presentable
|
||||
from spiel.rps import RPSCounter
|
||||
from spiel.state import State
|
||||
from spiel.triggers import Triggers
|
||||
from spiel.utils import clamp, filter_join
|
||||
|
||||
|
||||
def render_slide(state: State, slide: Presentable) -> ConsoleRenderable:
|
||||
return Padding(
|
||||
slide.render(triggers=Triggers(times=tuple(state.trigger_times))),
|
||||
pad=1,
|
||||
)
|
||||
|
||||
|
||||
def split_layout_into_deck_grid(root: Layout, state: State) -> Layout:
|
||||
grid_width = state.deck_grid_width
|
||||
row_of_current_slide = state.current_slide_idx // grid_width
|
||||
num_rows = ceil(len(state.deck) / grid_width)
|
||||
start_row = clamp(
|
||||
value=row_of_current_slide - (grid_width // 2),
|
||||
lower=0,
|
||||
upper=max(num_rows - grid_width, 0),
|
||||
)
|
||||
start_slide_idx = grid_width * start_row
|
||||
slides = islice(enumerate(state.deck.slides, start=1), start_slide_idx, None)
|
||||
|
||||
rows = [Layout(name=str(r)) for r in range(grid_width)]
|
||||
cols = [[Layout(name=f"{r}-{c}") for c in range(grid_width)] for r, _ in enumerate(rows)]
|
||||
|
||||
root.split_column(*rows)
|
||||
for row, layouts in zip(rows, cols):
|
||||
for layout in layouts:
|
||||
slide_number, slide = next(slides, (None, None))
|
||||
if slide is None:
|
||||
layout.update("")
|
||||
else:
|
||||
is_active_slide = slide is state.current_slide
|
||||
layout.update(
|
||||
Panel(
|
||||
slide.render(triggers=Triggers(times=(monotonic(),))),
|
||||
title=filter_join(" | ", [slide_number, slide.title]),
|
||||
border_style=Style(
|
||||
color="bright_cyan" if is_active_slide else None,
|
||||
dim=not is_active_slide,
|
||||
),
|
||||
)
|
||||
)
|
||||
row.split_row(*layouts)
|
||||
|
||||
return root
|
||||
|
||||
|
||||
def present_deck(state: State) -> None:
|
||||
rps_counter = RPSCounter()
|
||||
footer = Layout(Footer(state, rps_counter), name="footer", size=1)
|
||||
help = Layout(Help(state), name="help")
|
||||
|
||||
def get_renderable() -> Layout:
|
||||
current_slide = state.deck[state.current_slide_idx]
|
||||
|
||||
body = Layout(name="body", ratio=1)
|
||||
if state.mode is Mode.SLIDE:
|
||||
body.update(render_slide(state, current_slide))
|
||||
elif state.mode is Mode.DECK:
|
||||
split_layout_into_deck_grid(body, state)
|
||||
elif state.mode is Mode.HELP:
|
||||
body.update(help)
|
||||
elif state.mode is Mode.OPTIONS:
|
||||
body.update(state.options)
|
||||
else: # pragma: unreachable
|
||||
raise UnknownModeError(f"Unrecognized mode: {state.mode!r}")
|
||||
|
||||
root = Layout(name="root")
|
||||
root.split_column(body, footer)
|
||||
|
||||
rps_counter.mark()
|
||||
|
||||
return root
|
||||
|
||||
with no_echo(), Live(
|
||||
get_renderable=get_renderable,
|
||||
console=state.console,
|
||||
screen=True,
|
||||
auto_refresh=True,
|
||||
refresh_per_second=TARGET_RPS,
|
||||
vertical_overflow="visible",
|
||||
) as live:
|
||||
while True:
|
||||
handle_input(state, sys.stdin)
|
||||
live.refresh()
|
@ -1,27 +0,0 @@
|
||||
import inspect
|
||||
from collections.abc import Callable, Mapping
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from rich.console import ConsoleRenderable
|
||||
|
||||
from spiel.triggers import Triggers
|
||||
|
||||
|
||||
@dataclass
|
||||
class Presentable: # Why not an ABC? https://github.com/python/mypy/issues/5374
|
||||
title: str = ""
|
||||
|
||||
def render(self, triggers: Triggers) -> ConsoleRenderable:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_render_kwargs(
|
||||
self, function: Callable[..., ConsoleRenderable], triggers: Triggers
|
||||
) -> Mapping[str, Any]:
|
||||
signature = inspect.signature(function)
|
||||
|
||||
kwargs: dict[str, Any] = {}
|
||||
if "triggers" in signature.parameters:
|
||||
kwargs["triggers"] = triggers
|
||||
|
||||
return kwargs
|
@ -1,42 +0,0 @@
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
from pendulum import DateTime, now
|
||||
from rich.control import Control
|
||||
from rich.style import Style
|
||||
from rich.text import Text
|
||||
from watchdog.events import FileSystemEvent, FileSystemEventHandler
|
||||
|
||||
from spiel.load import load_deck_and_options
|
||||
from spiel.state import State
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeckReloader(FileSystemEventHandler):
|
||||
state: State
|
||||
deck_path: Path
|
||||
last_reload: DateTime = field(default_factory=now)
|
||||
|
||||
def on_modified(self, event: FileSystemEvent) -> None:
|
||||
self.last_reload = now()
|
||||
try:
|
||||
self.state.deck, _ = load_deck_and_options(self.deck_path)
|
||||
self.state.reset_trigger()
|
||||
self.state.set_message(
|
||||
lambda: Text(
|
||||
f"Reloaded deck from {self.deck_path} {self.last_reload.diff_for_humans(None, False)}",
|
||||
style=Style(color="bright_green"),
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
exc_type, exc_obj, exc_tb = sys.exc_info()
|
||||
self.state.set_message(
|
||||
lambda: Text(
|
||||
f"Error: {self.last_reload.diff_for_humans(None, False)}: {exc_obj!r}{Control.bell()}",
|
||||
style=Style(color="bright_red"),
|
||||
)
|
||||
)
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash((type(self), id(self)))
|
@ -0,0 +1,46 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from rich.console import Console, ConsoleOptions, RenderResult
|
||||
from rich.style import Style
|
||||
from rich.table import Column, Table
|
||||
from rich.text import Text
|
||||
|
||||
from spiel.constants import (
|
||||
PACKAGE_NAME,
|
||||
__python_version__,
|
||||
__rich_version__,
|
||||
__textual_version__,
|
||||
__version__,
|
||||
)
|
||||
|
||||
|
||||
class DebugTable:
|
||||
def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
|
||||
table = Table(
|
||||
Column(justify="right"),
|
||||
Column(justify="left"),
|
||||
show_header=False,
|
||||
title="Debug Information",
|
||||
)
|
||||
|
||||
table.add_row(f"{PACKAGE_NAME.capitalize()} Version", __version__)
|
||||
table.add_row("Rich Version", __rich_version__)
|
||||
table.add_row("Textual Version", __textual_version__)
|
||||
table.add_row("Python Version", __python_version__)
|
||||
|
||||
table.add_section()
|
||||
|
||||
table.add_row(
|
||||
"Color System",
|
||||
Text(
|
||||
console.color_system or "unknown",
|
||||
style=Style(color="red" if console.color_system != "truecolor" else "green"),
|
||||
),
|
||||
)
|
||||
table.add_row(
|
||||
"Console Dimensions",
|
||||
Text(f"{console.width} cells wide, {console.height} cells tall"),
|
||||
end_section=True,
|
||||
)
|
||||
|
||||
yield table
|
@ -1,30 +0,0 @@
|
||||
import code
|
||||
from collections.abc import Callable, MutableMapping
|
||||
|
||||
import IPython
|
||||
from traitlets.config import Config
|
||||
|
||||
REPLExecutor = Callable[[], None]
|
||||
|
||||
REPLS: MutableMapping[str, REPLExecutor] = {}
|
||||
|
||||
|
||||
def repl(name: str) -> Callable[[REPLExecutor], REPLExecutor]:
|
||||
def register(executor: REPLExecutor) -> REPLExecutor:
|
||||
REPLS[name] = executor
|
||||
return executor
|
||||
|
||||
return register
|
||||
|
||||
|
||||
@repl("builtin")
|
||||
def builtin() -> None:
|
||||
code.InteractiveConsole().interact()
|
||||
|
||||
|
||||
@repl("ipython")
|
||||
def ipython() -> None:
|
||||
c = Config()
|
||||
c.InteractiveShellEmbed.colors = "Neutral"
|
||||
|
||||
IPython.embed(config=c)
|
@ -1,34 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import deque
|
||||
from time import monotonic
|
||||
from typing import Deque
|
||||
|
||||
from spiel.constants import TARGET_RPS
|
||||
|
||||
|
||||
class RPSCounter:
|
||||
def __init__(self, render_history_length: int | None = None) -> None:
|
||||
if render_history_length is None:
|
||||
render_history_length = 3 * TARGET_RPS
|
||||
|
||||
self.render_time_history: Deque[float] = deque(maxlen=render_history_length)
|
||||
|
||||
@property
|
||||
def num_samples(self) -> int:
|
||||
return len(self.render_time_history)
|
||||
|
||||
def mark(self) -> None:
|
||||
self.render_time_history.append(monotonic())
|
||||
|
||||
def renders_per_second(self) -> float:
|
||||
if self.num_samples < 2:
|
||||
return 0
|
||||
|
||||
return self.num_samples / (self.render_time_history[-1] - self.render_time_history[0])
|
||||
|
||||
def last_elapsed_render_time(self) -> float:
|
||||
if self.num_samples < 2:
|
||||
return 0
|
||||
|
||||
return self.render_time_history[-1] - self.render_time_history[-2]
|
@ -0,0 +1,24 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.binding import Binding
|
||||
|
||||
from spiel.screens.screen import SpielScreen
|
||||
from spiel.widgets.footer import Footer
|
||||
from spiel.widgets.minislides import MiniSlides
|
||||
|
||||
|
||||
class DeckScreen(SpielScreen):
|
||||
BINDINGS = [
|
||||
Binding("right", "next_slide", "Go to next slide."),
|
||||
Binding("left", "prev_slide", "Go to previous slide."),
|
||||
Binding("down", "next_row", "Go to next row of slides."),
|
||||
Binding("up", "prev_row", "Go to previous row of slides."),
|
||||
Binding(
|
||||
"escape,enter", "switch_screen('slide')", "Go to Slide view with the selected slide."
|
||||
),
|
||||
]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield MiniSlides()
|
||||
yield Footer()
|
@ -0,0 +1,40 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.binding import Binding
|
||||
from textual.containers import Container
|
||||
|
||||
from spiel.screens.screen import SpielScreen
|
||||
from spiel.widgets.bindings import AppBindingsTableWidget, ScreenBindingsTableWidget
|
||||
from spiel.widgets.footer import Footer
|
||||
|
||||
|
||||
class HelpScreen(SpielScreen):
|
||||
DEFAULT_CSS = """
|
||||
.h-section {
|
||||
layout: horizontal;
|
||||
height: auto;
|
||||
align: center top;
|
||||
content-align: center top;
|
||||
}
|
||||
"""
|
||||
|
||||
BINDINGS = [
|
||||
Binding("escape,enter", "pop_screen", "Return to the previous view."),
|
||||
]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Container(
|
||||
AppBindingsTableWidget(),
|
||||
classes="h-section",
|
||||
)
|
||||
yield Container(
|
||||
ScreenBindingsTableWidget(id="slide"),
|
||||
ScreenBindingsTableWidget(id="deck"),
|
||||
classes="h-section",
|
||||
)
|
||||
yield Container(
|
||||
ScreenBindingsTableWidget(id="help"),
|
||||
classes="h-section",
|
||||
)
|
||||
yield Footer()
|
@ -0,0 +1,10 @@
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from textual.screen import Screen
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from spiel.app import SpielApp
|
||||
|
||||
|
||||
class SpielScreen(Screen):
|
||||
app: "SpielApp"
|
@ -0,0 +1,45 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.binding import Binding
|
||||
from textual.events import Key
|
||||
|
||||
from spiel.screens.screen import SpielScreen
|
||||
from spiel.widgets.footer import Footer
|
||||
from spiel.widgets.slide import SlideWidget
|
||||
|
||||
SUSPEND = "suspend"
|
||||
|
||||
|
||||
class SlideScreen(SpielScreen):
|
||||
DEFAULT_CSS = """
|
||||
Screen {
|
||||
layout: vertical;
|
||||
}
|
||||
"""
|
||||
|
||||
BINDINGS = [
|
||||
Binding("right", "next_slide", "Go to next slide."),
|
||||
Binding("left", "prev_slide", "Go to previous slide."),
|
||||
Binding("t", "trigger", "Trigger the current slide."),
|
||||
Binding("r", "reset_trigger", "Reset trigger state."),
|
||||
]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield SlideWidget()
|
||||
yield Footer()
|
||||
|
||||
def on_key(self, event: Key) -> None:
|
||||
slide = self.app.deck[self.app.current_slide_idx]
|
||||
bind = slide.bindings.get(event.key)
|
||||
|
||||
if callable(bind):
|
||||
signature = inspect.signature(bind)
|
||||
|
||||
kwargs: dict[str, object] = {}
|
||||
if SUSPEND in signature.parameters:
|
||||
kwargs[SUSPEND] = self.app.suspend
|
||||
|
||||
bind(**kwargs)
|
@ -1,24 +1,30 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import inspect
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Callable, Mapping
|
||||
|
||||
from rich.console import ConsoleRenderable
|
||||
from rich.console import RenderableType
|
||||
from rich.text import Text
|
||||
|
||||
from spiel.presentable import Presentable
|
||||
from spiel.triggers import Triggers
|
||||
|
||||
MakeRenderable = Callable[..., ConsoleRenderable]
|
||||
RenderableLike = MakeRenderable | ConsoleRenderable
|
||||
TRIGGERS = "triggers"
|
||||
|
||||
Content = Callable[..., RenderableType]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Slide(Presentable):
|
||||
content: RenderableLike = field(default_factory=Text)
|
||||
|
||||
def render(self, triggers: Triggers) -> ConsoleRenderable:
|
||||
if callable(self.content):
|
||||
return self.content(**self.get_render_kwargs(function=self.content, triggers=triggers))
|
||||
else:
|
||||
return self.content
|
||||
class Slide:
|
||||
title: str = ""
|
||||
content: Content = lambda: Text()
|
||||
bindings: Mapping[str, Callable[[], None]] = field(default_factory=dict)
|
||||
|
||||
def render(self, triggers: Triggers) -> RenderableType:
|
||||
signature = inspect.signature(self.content)
|
||||
|
||||
kwargs: dict[str, object] = {}
|
||||
if TRIGGERS in signature.parameters:
|
||||
kwargs[TRIGGERS] = triggers
|
||||
|
||||
return self.content(**kwargs)
|
||||
|
@ -1,123 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from time import monotonic
|
||||
from types import TracebackType
|
||||
from typing import ContextManager, Type
|
||||
|
||||
from rich.console import Console
|
||||
from rich.style import Style
|
||||
from rich.text import Text
|
||||
|
||||
from spiel.constants import PACKAGE_NAME
|
||||
from spiel.deck import Deck
|
||||
from spiel.load import load_deck_and_options
|
||||
from spiel.modes import Mode
|
||||
from spiel.options import Options
|
||||
from spiel.presentable import Presentable
|
||||
|
||||
TextLike = Text | Callable[[], Text]
|
||||
|
||||
|
||||
@dataclass
|
||||
class State(ContextManager["State"]):
|
||||
console: Console
|
||||
deck: Deck
|
||||
options: Options
|
||||
_current_slide_idx: int = 0
|
||||
_mode: Mode = Mode.SLIDE
|
||||
_message: TextLike = Text("")
|
||||
trigger_times: list[float] = field(default_factory=list)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path: Path, console: Console | None = None) -> State:
|
||||
deck, options = load_deck_and_options(path)
|
||||
return cls(console=console or Console(), deck=deck, options=options)
|
||||
|
||||
@property
|
||||
def mode(self) -> Mode:
|
||||
return self._mode
|
||||
|
||||
@mode.setter
|
||||
def mode(self, mode: Mode) -> None:
|
||||
self._mode = mode
|
||||
self.reset_trigger()
|
||||
|
||||
@property
|
||||
def current_slide_idx(self) -> int:
|
||||
return self._current_slide_idx
|
||||
|
||||
@current_slide_idx.setter
|
||||
def current_slide_idx(self, idx: int) -> None:
|
||||
self._current_slide_idx = max(0, min(len(self.deck) - 1, idx))
|
||||
self.reset_trigger()
|
||||
|
||||
def next_slide(self, move: int = 1) -> None:
|
||||
if self.current_slide_idx == len(self.deck) - 1:
|
||||
return
|
||||
self.current_slide_idx += move
|
||||
|
||||
def previous_slide(self, move: int = 1) -> None:
|
||||
if self.current_slide_idx == 0:
|
||||
return
|
||||
self.current_slide_idx -= move
|
||||
|
||||
def jump_to_slide(self, idx: int) -> None:
|
||||
self.current_slide_idx = idx
|
||||
|
||||
@property
|
||||
def current_slide(self) -> Presentable:
|
||||
return self.deck[self.current_slide_idx]
|
||||
|
||||
@property
|
||||
def message(self) -> Text:
|
||||
if callable(self._message):
|
||||
try:
|
||||
return self._message()
|
||||
except Exception:
|
||||
return Text(
|
||||
"Internal Error: failed to display message.",
|
||||
style=Style(color="bright_red"),
|
||||
)
|
||||
else:
|
||||
return self._message
|
||||
|
||||
def set_message(self, message: TextLike) -> None:
|
||||
self._message = message
|
||||
|
||||
def clear_message(self) -> None:
|
||||
self.set_message(Text(""))
|
||||
|
||||
@property
|
||||
def deck_grid_width(self) -> int:
|
||||
return max(self.console.size.width // 30, 1)
|
||||
|
||||
def trigger(self) -> None:
|
||||
self.trigger_times.append(monotonic())
|
||||
|
||||
def reset_trigger(self) -> None:
|
||||
self.trigger_times.clear()
|
||||
self.trigger()
|
||||
|
||||
@cached_property
|
||||
def _tmp_dir(self) -> TemporaryDirectory[str]:
|
||||
return TemporaryDirectory(prefix=f"{PACKAGE_NAME}-")
|
||||
|
||||
@cached_property
|
||||
def tmp_dir(self) -> Path:
|
||||
return Path(self._tmp_dir.name)
|
||||
|
||||
def __enter__(self) -> State:
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exctype: Type[BaseException] | None,
|
||||
excinst: BaseException | None,
|
||||
exctb: TracebackType | None,
|
||||
) -> None:
|
||||
self._tmp_dir.cleanup()
|
@ -0,0 +1,61 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from rich.console import RenderableType
|
||||
from rich.padding import Padding
|
||||
from rich.table import Column, Table
|
||||
from rich.text import Text
|
||||
from textual.binding import Binding
|
||||
|
||||
from spiel.widgets.widget import SpielWidget
|
||||
|
||||
|
||||
class AppBindingsTableWidget(SpielWidget):
|
||||
DEFAULT_CSS = """
|
||||
AppBindingsTableWidget {
|
||||
width: auto;
|
||||
height: auto;
|
||||
}
|
||||
"""
|
||||
|
||||
def render(self) -> RenderableType:
|
||||
table = Table(
|
||||
Column("Key", justify="left"),
|
||||
Column("Description", justify="left"),
|
||||
title=f"All Views",
|
||||
)
|
||||
|
||||
for binding in self.app.BINDINGS:
|
||||
if isinstance(binding, Binding):
|
||||
table.add_row(binding.key, binding.description)
|
||||
else:
|
||||
raise TypeError(f"{binding} on {self.app} needs to be a {Binding.__name__}")
|
||||
|
||||
return Padding(table, pad=1)
|
||||
|
||||
|
||||
class ScreenBindingsTableWidget(SpielWidget):
|
||||
DEFAULT_CSS = """
|
||||
ScreenBindingsTableWidget {
|
||||
width: auto;
|
||||
height: auto;
|
||||
}
|
||||
"""
|
||||
|
||||
def render(self) -> RenderableType:
|
||||
if self.id is None:
|
||||
return Text("")
|
||||
|
||||
screen = self.app.get_screen(self.id)
|
||||
table = Table(
|
||||
Column("Key", justify="left"),
|
||||
Column("Description", justify="left"),
|
||||
title=f"{self.id.title()} View",
|
||||
)
|
||||
|
||||
for binding in screen.BINDINGS:
|
||||
if isinstance(binding, Binding):
|
||||
table.add_row(binding.key, binding.description)
|
||||
else:
|
||||
raise TypeError(f"{binding} on {screen} needs to be a {Binding.__name__}")
|
||||
|
||||
return Padding(table, pad=1)
|
@ -0,0 +1,55 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from rich.console import Group, RenderableType
|
||||
from rich.rule import Rule
|
||||
from rich.style import Style
|
||||
from rich.table import Column, Table
|
||||
from rich.text import Text
|
||||
from textual.reactive import reactive
|
||||
|
||||
from spiel.constants import FOOTER_TIME_FORMAT
|
||||
from spiel.widgets.widget import SpielWidget
|
||||
|
||||
|
||||
class Footer(SpielWidget):
|
||||
DEFAULT_CSS = """
|
||||
Footer {
|
||||
color: $text;
|
||||
dock: bottom;
|
||||
height: 2;
|
||||
}
|
||||
"""
|
||||
|
||||
now: datetime = reactive(datetime.now) # type: ignore[arg-type,assignment]
|
||||
|
||||
def on_mount(self) -> None:
|
||||
super().on_mount()
|
||||
|
||||
self.set_interval(1 / 60, self.update_now)
|
||||
|
||||
def update_now(self) -> None:
|
||||
self.now = datetime.now()
|
||||
|
||||
@property
|
||||
def longest_slide_number_length(self) -> int:
|
||||
num_slides = len(self.app.deck)
|
||||
return len(str(num_slides))
|
||||
|
||||
def render(self) -> RenderableType:
|
||||
grid = Table.grid(
|
||||
Column(style=Style(dim=True), justify="left"),
|
||||
Column(style=Style(bold=True), justify="center"),
|
||||
Column(style=Style(dim=True), justify="right"),
|
||||
expand=True,
|
||||
padding=1,
|
||||
)
|
||||
grid.add_row(
|
||||
Text(f"{self.app.deck.name} | {self.app.deck[self.app.current_slide_idx].title}"),
|
||||
self.app.message,
|
||||
Text(
|
||||
f"{self.now.strftime(FOOTER_TIME_FORMAT)} [{self.app.current_slide_idx + 1:>0{self.longest_slide_number_length}d} / {len(self.app.deck)}]"
|
||||
),
|
||||
)
|
||||
return Group(Rule(style=Style(dim=True)), grid)
|
@ -0,0 +1,70 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from itertools import islice
|
||||
from math import ceil
|
||||
|
||||
from rich.console import RenderableType
|
||||
from rich.layout import Layout
|
||||
from rich.panel import Panel
|
||||
from rich.style import Style
|
||||
from rich.text import Text
|
||||
|
||||
from spiel.triggers import Triggers
|
||||
from spiel.utils import clamp
|
||||
from spiel.widgets.widget import SpielWidget
|
||||
|
||||
|
||||
class MiniSlides(SpielWidget):
|
||||
def render(self) -> RenderableType:
|
||||
grid_width = self.app.deck_grid_width
|
||||
row_of_current_slide = self.app.current_slide_idx // grid_width
|
||||
num_rows = ceil(len(self.app.deck) / grid_width)
|
||||
start_row = clamp(
|
||||
value=row_of_current_slide - (grid_width // 2),
|
||||
lower=0,
|
||||
upper=max(num_rows - grid_width, 0),
|
||||
)
|
||||
start_slide_idx = grid_width * start_row
|
||||
slides = islice(enumerate(self.app.deck.slides), start_slide_idx, None)
|
||||
|
||||
rows = [Layout(name=str(r)) for r in range(grid_width)]
|
||||
cols = [[Layout(name=f"{r}-{c}") for c in range(grid_width)] for r, _ in enumerate(rows)]
|
||||
|
||||
root = Layout()
|
||||
root.split_column(*rows)
|
||||
|
||||
for row, layouts in zip(rows, cols):
|
||||
row.split_row(*layouts)
|
||||
|
||||
for layout in layouts:
|
||||
slide_idx, slide = next(slides, (None, None))
|
||||
if slide_idx is None or slide is None:
|
||||
layout.update("")
|
||||
else:
|
||||
is_active_slide = slide_idx == self.app.current_slide_idx
|
||||
|
||||
try:
|
||||
content = slide.render(triggers=Triggers.new())
|
||||
border_style = Style(
|
||||
color="bright_cyan" if is_active_slide else None,
|
||||
dim=not is_active_slide,
|
||||
)
|
||||
except Exception as e:
|
||||
content = Text(
|
||||
f"Failed to render slide {slide_idx + 1} due to:\n{e}",
|
||||
style=Style(color="red"),
|
||||
)
|
||||
border_style = Style(
|
||||
color="red1",
|
||||
dim=not is_active_slide,
|
||||
)
|
||||
|
||||
layout.update(
|
||||
Panel(
|
||||
content,
|
||||
title=" | ".join((str(slide_idx + 1), slide.title)),
|
||||
border_style=border_style,
|
||||
)
|
||||
)
|
||||
|
||||
return root
|
@ -0,0 +1,50 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from time import monotonic
|
||||
|
||||
from rich.box import HEAVY
|
||||
from rich.console import RenderableType
|
||||
from rich.panel import Panel
|
||||
from rich.style import Style
|
||||
from rich.traceback import Traceback
|
||||
from textual.reactive import reactive
|
||||
|
||||
import spiel
|
||||
from spiel.exceptions import SpielException
|
||||
from spiel.triggers import Triggers
|
||||
from spiel.widgets.widget import SpielWidget
|
||||
|
||||
|
||||
class SlideWidget(SpielWidget):
|
||||
triggers: Triggers = reactive(Triggers.new) # type: ignore[assignment,arg-type]
|
||||
|
||||
def on_mount(self) -> None:
|
||||
super().on_mount()
|
||||
|
||||
self.set_interval(1 / 60, self.update_triggers)
|
||||
|
||||
def update_triggers(self) -> None:
|
||||
self.triggers = Triggers(now=monotonic(), times=self.triggers.times)
|
||||
|
||||
def render(self) -> RenderableType:
|
||||
try:
|
||||
self.remove_class("error")
|
||||
slide = self.app.deck[self.app.current_slide_idx]
|
||||
return slide.render(triggers=self.triggers)
|
||||
except Exception:
|
||||
self.add_class("error")
|
||||
et, ev, tr = sys.exc_info()
|
||||
if et is None or ev is None or tr is None:
|
||||
raise SpielException("Expected to be handling an exception, but wasn't.")
|
||||
return Panel(
|
||||
Traceback.from_exception(
|
||||
exc_type=et,
|
||||
exc_value=ev,
|
||||
traceback=tr,
|
||||
suppress=(spiel,),
|
||||
),
|
||||
title="Slide failed to render",
|
||||
border_style=Style(bold=True, color="red1"),
|
||||
box=HEAVY,
|
||||
)
|
@ -0,0 +1,21 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from textual.reactive import watch
|
||||
from textual.widget import Widget
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from spiel.app import SpielApp
|
||||
|
||||
|
||||
class SpielWidget(Widget):
|
||||
app: "SpielApp"
|
||||
|
||||
def on_mount(self) -> None:
|
||||
watch(self.app, "deck", self.r)
|
||||
watch(self.app, "current_slide_idx", self.r)
|
||||
watch(self.app, "message", self.r)
|
||||
|
||||
def r(self, _: object) -> None:
|
||||
self.refresh()
|
@ -1,20 +1,10 @@
|
||||
import pytest
|
||||
from spiel import Triggers
|
||||
from spiel.demo.demo import DemoRenderFailure, deck
|
||||
|
||||
from spiel.main import DEMO_SOURCE
|
||||
from spiel.present import render_slide
|
||||
from spiel.state import State
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def state() -> State:
|
||||
return State.from_file(DEMO_SOURCE)
|
||||
|
||||
|
||||
def test_can_render_every_demo_slide(state: State) -> None:
|
||||
deck = state.deck
|
||||
|
||||
def test_can_render_every_demo_slide() -> None:
|
||||
for slide in deck:
|
||||
for _ in range(10):
|
||||
state.console.print(render_slide(state, slide))
|
||||
state.trigger()
|
||||
state.reset_trigger()
|
||||
try:
|
||||
slide.render(triggers=Triggers.new())
|
||||
except DemoRenderFailure:
|
||||
pass
|
||||
|
@ -1,39 +0,0 @@
|
||||
from collections.abc import Callable
|
||||
from io import StringIO
|
||||
|
||||
import pytest
|
||||
from rich.console import Console
|
||||
from rich.layout import Layout
|
||||
from rich.text import Text
|
||||
|
||||
from spiel import Slide
|
||||
from spiel.present import render_slide, split_layout_into_deck_grid
|
||||
from spiel.state import State
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"make_slide",
|
||||
[
|
||||
lambda: Slide(content=Text("foobar")),
|
||||
lambda: Slide(content=lambda: Text("foobar")),
|
||||
lambda: Slide(content=lambda triggers: Text("foobar")),
|
||||
],
|
||||
)
|
||||
def test_can_render_slide(
|
||||
make_slide: Callable[[], Slide],
|
||||
console: Console,
|
||||
output: StringIO,
|
||||
three_slide_state: State,
|
||||
) -> None:
|
||||
renderable = render_slide(state=three_slide_state, slide=make_slide())
|
||||
|
||||
console.print(renderable)
|
||||
|
||||
result = output.getvalue()
|
||||
|
||||
assert "foobar" in result
|
||||
|
||||
|
||||
def test_can_render_deck_grid(three_slide_state: State) -> None:
|
||||
root = Layout()
|
||||
split_layout_into_deck_grid(root, three_slide_state)
|
@ -1,17 +0,0 @@
|
||||
from io import StringIO
|
||||
|
||||
from rich.console import Console
|
||||
|
||||
from spiel.footer import Footer
|
||||
from spiel.rps import RPSCounter
|
||||
from spiel.state import State
|
||||
|
||||
|
||||
def test_deck_name_in_footer(console: Console, output: StringIO, three_slide_state: State) -> None:
|
||||
footer = Footer(state=three_slide_state, rps_counter=RPSCounter())
|
||||
|
||||
console.print(footer)
|
||||
|
||||
result = output.getvalue()
|
||||
print(repr(result))
|
||||
assert three_slide_state.deck.name in result
|
@ -1,8 +0,0 @@
|
||||
from rich.console import Console
|
||||
|
||||
from spiel.help import Help
|
||||
from spiel.state import State
|
||||
|
||||
|
||||
def test_can_render_help(console: Console, three_slide_state: State) -> None:
|
||||
console.print(Help(three_slide_state))
|
@ -1,93 +0,0 @@
|
||||
import os
|
||||
import string
|
||||
from random import sample
|
||||
|
||||
import pytest
|
||||
from hypothesis import given, settings
|
||||
from hypothesis import strategies as st
|
||||
from rich.console import Console
|
||||
from rich.text import Text
|
||||
from typer import Exit
|
||||
|
||||
from spiel import Deck, Options, Slide
|
||||
from spiel.input import (
|
||||
INPUT_HANDLERS,
|
||||
InputHandler,
|
||||
deck_mode,
|
||||
edit_example,
|
||||
edit_options,
|
||||
exit,
|
||||
jump_to_slide,
|
||||
next_slide,
|
||||
open_repl,
|
||||
previous_slide,
|
||||
slide_mode,
|
||||
)
|
||||
from spiel.modes import Mode
|
||||
from spiel.state import State
|
||||
|
||||
|
||||
def test_next_slide_goes_to_next_slide(three_slide_state: State) -> None:
|
||||
next_slide(three_slide_state)
|
||||
|
||||
assert three_slide_state.current_slide is three_slide_state.deck[1]
|
||||
|
||||
|
||||
def test_previous_slide_goes_to_previous_slide(three_slide_state: State) -> None:
|
||||
three_slide_state.jump_to_slide(2)
|
||||
|
||||
previous_slide(three_slide_state)
|
||||
|
||||
assert three_slide_state.current_slide is three_slide_state.deck[1]
|
||||
|
||||
|
||||
def test_enter_deck_mode(three_slide_state: State) -> None:
|
||||
deck_mode(three_slide_state)
|
||||
|
||||
assert three_slide_state.mode is Mode.DECK
|
||||
|
||||
|
||||
def test_enter_slide_mode(three_slide_state: State) -> None:
|
||||
slide_mode(three_slide_state)
|
||||
|
||||
assert three_slide_state.mode is Mode.SLIDE
|
||||
|
||||
|
||||
def test_kill(three_slide_state: State) -> None:
|
||||
with pytest.raises(Exit):
|
||||
exit(three_slide_state)
|
||||
|
||||
|
||||
TESTABLE_INPUT_HANDLERS = list(
|
||||
set(INPUT_HANDLERS.values()).difference(
|
||||
{
|
||||
exit,
|
||||
jump_to_slide,
|
||||
open_repl,
|
||||
edit_options,
|
||||
edit_example,
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@given(input_handlers=st.lists(st.sampled_from(TESTABLE_INPUT_HANDLERS)))
|
||||
@settings(max_examples=2_000 if os.getenv("CI") else 200)
|
||||
def test_input_sequences_dont_crash(input_handlers: list[InputHandler]) -> None:
|
||||
state = State(
|
||||
console=Console(),
|
||||
deck=Deck(
|
||||
name="deck",
|
||||
slides=[
|
||||
Slide(
|
||||
content=Text(f"This is slide {n + 1}"),
|
||||
title="".join(sample(string.ascii_letters, 30)),
|
||||
)
|
||||
for n in range(30)
|
||||
],
|
||||
),
|
||||
options=Options(),
|
||||
)
|
||||
|
||||
for input_handler in input_handlers:
|
||||
input_handler(state)
|
@ -1,62 +0,0 @@
|
||||
from io import StringIO
|
||||
|
||||
import pytest
|
||||
from pytest_mock import MockFixture
|
||||
from rich.console import Console
|
||||
|
||||
from spiel.input import SPECIAL_CHARACTERS, SpecialCharacters, get_character, handle_input
|
||||
from spiel.modes import Mode
|
||||
from spiel.state import State
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input, expected", SPECIAL_CHARACTERS.items())
|
||||
def test_get_character_recognizes_special_characters(
|
||||
input: str, expected: SpecialCharacters
|
||||
) -> None:
|
||||
io = StringIO(input)
|
||||
|
||||
assert get_character(io) == expected
|
||||
|
||||
|
||||
def test_handle_input_calls_matching_handler_and_returns_its_return_value(
|
||||
console: Console, three_slide_state: State, mocker: MockFixture
|
||||
) -> None:
|
||||
mock = mocker.MagicMock(return_value="foobar")
|
||||
|
||||
result = handle_input(
|
||||
state=three_slide_state,
|
||||
stream=StringIO("a"),
|
||||
handlers={("a", three_slide_state.mode): mock},
|
||||
)
|
||||
|
||||
assert mock.called
|
||||
assert result == "foobar"
|
||||
|
||||
|
||||
def test_handle_input_returns_none_for_missed_input_based_on_character(
|
||||
console: Console, three_slide_state: State, mocker: MockFixture
|
||||
) -> None:
|
||||
mock = mocker.MagicMock(return_value="foobar")
|
||||
|
||||
result = handle_input(
|
||||
state=three_slide_state,
|
||||
stream=StringIO("a"),
|
||||
handlers={("b", three_slide_state.mode): mock},
|
||||
)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_handle_input_returns_none_for_missed_input_based_on_mode(
|
||||
console: Console, three_slide_state: State, mocker: MockFixture
|
||||
) -> None:
|
||||
mock = mocker.MagicMock(return_value="foobar")
|
||||
three_slide_state.mode = Mode.SLIDE
|
||||
|
||||
result = handle_input(
|
||||
state=three_slide_state,
|
||||
stream=StringIO("a"),
|
||||
handlers={("a", Mode.HELP): mock},
|
||||
)
|
||||
|
||||
assert result is None
|
@ -1,53 +0,0 @@
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from _pytest.tmpdir import TempPathFactory
|
||||
from hypothesis import given
|
||||
from hypothesis import strategies as st
|
||||
from hypothesis.strategies import SearchStrategy
|
||||
from rich.console import Console
|
||||
|
||||
from spiel import Options
|
||||
from spiel.exceptions import InvalidOptionValue
|
||||
from spiel.repls import REPLS
|
||||
|
||||
|
||||
def valid_options() -> SearchStrategy[Options]:
|
||||
return st.builds(
|
||||
Options,
|
||||
profiling=st.booleans(),
|
||||
repl=st.sampled_from(list(REPLS.keys())),
|
||||
)
|
||||
|
||||
|
||||
@given(o=valid_options())
|
||||
def test_round_trip_to_dict(o: Options) -> None:
|
||||
assert o == Options.from_dict(o.as_dict())
|
||||
|
||||
|
||||
@given(o=valid_options())
|
||||
def test_round_trip_to_toml(o: Options) -> None:
|
||||
assert o == Options.from_toml(o.as_toml())
|
||||
|
||||
|
||||
@given(o=valid_options())
|
||||
def test_round_trip_to_file(o: Options, tmp_path_factory: TempPathFactory) -> None:
|
||||
dir = tmp_path_factory.mktemp(basename="options-roundtrip")
|
||||
path = dir / "options.toml"
|
||||
|
||||
assert o == Options.load(o.save(path))
|
||||
|
||||
|
||||
def test_can_render_options(console: Console, three_slide_options: Options) -> None:
|
||||
console.print(three_slide_options)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, value",
|
||||
[
|
||||
("repl", "foobar"),
|
||||
],
|
||||
)
|
||||
def test_reject_invalid_option_values(key: str, value: Any) -> None:
|
||||
with pytest.raises(InvalidOptionValue):
|
||||
Options(**{key: value})
|
@ -1,82 +0,0 @@
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
from textwrap import dedent
|
||||
from time import sleep
|
||||
|
||||
from rich.console import Console
|
||||
|
||||
from spiel.constants import DECK
|
||||
from spiel.load import DeckWatcher
|
||||
from spiel.reloader import DeckReloader
|
||||
from spiel.state import State
|
||||
|
||||
|
||||
def test_reloader_triggers_when_file_modified(
|
||||
file_with_empty_deck: Path,
|
||||
console: Console,
|
||||
output: StringIO,
|
||||
) -> None:
|
||||
state = State.from_file(file_with_empty_deck)
|
||||
reloader = DeckReloader(state=state, deck_path=file_with_empty_deck)
|
||||
|
||||
with DeckWatcher(event_handler=reloader, path=file_with_empty_deck, poll=True):
|
||||
sleep(0.01)
|
||||
|
||||
file_with_empty_deck.write_text(
|
||||
dedent(
|
||||
f"""\
|
||||
from spiel import Deck
|
||||
|
||||
{DECK} = Deck(name="modified")
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
sleep(0.01)
|
||||
|
||||
for attempt in range(10):
|
||||
console.print(state.message)
|
||||
result = output.getvalue()
|
||||
if state.deck.name == "modified" and "Reloaded deck" in result:
|
||||
return # test succeeded
|
||||
sleep(0.1)
|
||||
|
||||
assert (
|
||||
False
|
||||
), f"Reloader never triggered, current file contents:\n{file_with_empty_deck.read_text()}" # pragma: debugging
|
||||
|
||||
|
||||
def test_reloader_captures_error_in_message(
|
||||
file_with_empty_deck: Path,
|
||||
console: Console,
|
||||
output: StringIO,
|
||||
) -> None:
|
||||
state = State.from_file(file_with_empty_deck)
|
||||
reloader = DeckReloader(state=state, deck_path=file_with_empty_deck)
|
||||
|
||||
with DeckWatcher(event_handler=reloader, path=file_with_empty_deck, poll=True):
|
||||
sleep(0.01)
|
||||
|
||||
file_with_empty_deck.write_text(
|
||||
dedent(
|
||||
f"""\
|
||||
from spiel import Deck
|
||||
|
||||
{DECK} = Deck(name="modified")
|
||||
foobar
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
sleep(0.01)
|
||||
|
||||
for attempt in range(10):
|
||||
console.print(state.message)
|
||||
result = output.getvalue()
|
||||
if "NameError" in result and "foobar" in result:
|
||||
return # test succeeded
|
||||
sleep(0.1)
|
||||
|
||||
assert (
|
||||
False
|
||||
), f"Reloader never triggered, current file contents:\n{file_with_empty_deck.read_text()}" # pragma: debugging
|
@ -1,41 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from spiel.rps import RPSCounter
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def counter() -> RPSCounter:
|
||||
return RPSCounter()
|
||||
|
||||
|
||||
def test_renders_per_second(counter: RPSCounter) -> None:
|
||||
# 3 renders in 4 seconds
|
||||
counter.render_time_history.extend([1, 2, 5])
|
||||
|
||||
assert counter.renders_per_second() == 3 / 4
|
||||
|
||||
|
||||
def test_not_enough_samples_for_renders_per_second(counter: RPSCounter) -> None:
|
||||
counter.render_time_history.extend([1])
|
||||
|
||||
# 1 sample isn't enough
|
||||
|
||||
assert counter.renders_per_second() == 0
|
||||
|
||||
|
||||
def test_last_elapsed_render_time(counter: RPSCounter) -> None:
|
||||
counter.render_time_history.extend([1, 2, 5])
|
||||
|
||||
assert counter.last_elapsed_render_time() == 3
|
||||
|
||||
|
||||
def test_not_enough_samples_last_elapsed_render_time(counter: RPSCounter) -> None:
|
||||
counter.render_time_history.extend([1])
|
||||
|
||||
# 1 sample isn't enough
|
||||
|
||||
assert counter.last_elapsed_render_time() == 0
|
||||
|
||||
|
||||
def test_custom_length() -> None:
|
||||
assert RPSCounter(render_history_length=5).render_time_history.maxlen == 5
|
@ -0,0 +1,5 @@
|
||||
from spiel import Slide, Triggers
|
||||
|
||||
|
||||
def test_can_render_default_slide() -> None:
|
||||
Slide().render(triggers=Triggers.new())
|
@ -1,102 +0,0 @@
|
||||
import pytest
|
||||
from rich.console import Console
|
||||
from rich.style import Style
|
||||
from rich.text import Text
|
||||
|
||||
from spiel import Deck, Options
|
||||
from spiel.state import State, TextLike
|
||||
|
||||
|
||||
def test_initial_state_has_first_slide_current(three_slide_state: State) -> None:
|
||||
assert three_slide_state.current_slide is three_slide_state.deck[0]
|
||||
|
||||
|
||||
def test_next_from_first_to_second(three_slide_state: State) -> None:
|
||||
three_slide_state.next_slide()
|
||||
assert three_slide_state.current_slide is three_slide_state.deck[1]
|
||||
|
||||
|
||||
def test_next_from_first_to_third(three_slide_state: State) -> None:
|
||||
three_slide_state.next_slide(move=2)
|
||||
assert three_slide_state.current_slide is three_slide_state.deck[2]
|
||||
|
||||
|
||||
def test_jump_to_third_slide(three_slide_state: State) -> None:
|
||||
three_slide_state.jump_to_slide(2)
|
||||
assert three_slide_state.current_slide is three_slide_state.deck[2]
|
||||
|
||||
|
||||
def test_jump_before_beginning_results_in_beginning(three_slide_state: State) -> None:
|
||||
three_slide_state.jump_to_slide(-5)
|
||||
assert three_slide_state.current_slide is three_slide_state.deck[0]
|
||||
|
||||
|
||||
def test_jump_past_end_results_in_end(three_slide_state: State) -> None:
|
||||
three_slide_state.jump_to_slide(len(three_slide_state.deck) + 5)
|
||||
assert three_slide_state.current_slide is three_slide_state.deck[-1]
|
||||
|
||||
|
||||
def test_next_from_last_slide_stays_put(three_slide_state: State) -> None:
|
||||
three_slide_state.jump_to_slide(2)
|
||||
|
||||
three_slide_state.next_slide()
|
||||
assert three_slide_state.current_slide is three_slide_state.deck[2]
|
||||
|
||||
|
||||
def test_previous_from_first_slide_stays_put(three_slide_state: State) -> None:
|
||||
three_slide_state.previous_slide()
|
||||
|
||||
assert three_slide_state.current_slide is three_slide_state.deck[0]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"width, expected",
|
||||
[
|
||||
(20, 1),
|
||||
(30, 1),
|
||||
(40, 1),
|
||||
(60, 2),
|
||||
(80, 2),
|
||||
(95, 3),
|
||||
(120, 4),
|
||||
],
|
||||
)
|
||||
def test_deck_grid_width(width: int, expected: int) -> None:
|
||||
console = Console(width=width)
|
||||
state = State(console=console, deck=Deck(name="deck"), options=Options())
|
||||
|
||||
assert state.deck_grid_width == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"message, expected",
|
||||
[
|
||||
(Text("foobar"), Text("foobar")),
|
||||
(lambda: Text("wizbang"), Text("wizbang")),
|
||||
(
|
||||
lambda: 1 / 0,
|
||||
Text(
|
||||
"Internal Error: failed to display message.",
|
||||
style=Style(color="bright_red"),
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_set_message(message: TextLike, expected: Text, three_slide_state: State) -> None:
|
||||
three_slide_state.set_message(message)
|
||||
|
||||
assert three_slide_state.message == expected
|
||||
|
||||
|
||||
def test_clear_message(three_slide_state: State) -> None:
|
||||
three_slide_state.set_message(Text("foobar"))
|
||||
|
||||
three_slide_state.clear_message()
|
||||
|
||||
assert three_slide_state.message == Text("")
|
||||
|
||||
|
||||
def test_tmp_dir_lifecycle(three_slide_state: State) -> None:
|
||||
with three_slide_state:
|
||||
assert three_slide_state.tmp_dir.exists()
|
||||
assert not three_slide_state.tmp_dir.exists()
|
Loading…
Reference in New Issue