You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
spiel/spiel/app.py

279 lines
9.3 KiB
Python

from __future__ import annotations
import asyncio
import code
import datetime
import importlib.util
import os
import sys
from asyncio import wait
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from functools import cached_property, partial
from pathlib import Path
from time import monotonic
from typing import Callable, ClassVar, ContextManager, Iterator, List, Tuple
from rich.style import Style
from rich.text import Text
from textual import log
from textual.app import App
from textual.binding import Binding
from textual.events import Resize
from textual.reactive import reactive
from watchfiles import awatch
from spiel.constants import DECK, RELOAD_MESSAGE_TIME_FORMAT
from spiel.deck import Deck
from spiel.exceptions import NoDeckFound
from spiel.screens.deck import DeckScreen
from spiel.screens.help import HelpScreen
from spiel.screens.slide import SlideScreen
from spiel.screens.transition import SlideTransitionScreen
from spiel.transitions.protocol import Direction
from spiel.triggers import Triggers
from spiel.utils import clamp
from spiel.widgets.slide import SlideWidget
def load_deck(path: Path) -> Deck:
module_name = "__deck"
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is None:
raise NoDeckFound(f"{path.resolve()} does not appear to be an importable Python module.")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
loader = spec.loader
assert loader is not None
loader.exec_module(module)
try:
deck = getattr(module, DECK)
except AttributeError:
raise NoDeckFound(f"The module at {path} does not have an attribute named {DECK}.")
if not isinstance(deck, Deck):
raise NoDeckFound(
f"The module at {path} has an attribute named {DECK}, but it is a {type(deck).__name__}, not a {Deck.__name__}."
)
return deck
SuspendType = Callable[[], ContextManager[None]]
class SpielApp(App[None]):
CSS_PATH = "spiel.css"
BINDINGS: ClassVar[List[Binding | Tuple[str, str, str]]] = [
Binding("d", "switch_screen('deck')", "Go to the Deck view."),
Binding("question_mark", "push_screen('help')", "Go to the Help view."),
Binding("i", "repl", "Switch to the REPL."),
Binding("p", "screenshot", "Take a screenshot."),
]
deck = reactive(Deck(name="New Deck"))
current_slide_idx = reactive(0)
message = reactive(Text(""))
def __init__(
self,
deck_path: Path,
watch_path: Path | None = None,
_show_messages: bool = True,
_fixed_time: datetime.datetime | None = None,
_fixed_triggers: Triggers | None = None,
_enable_transitions: bool = True,
_slide_refresh_rate: float = 1 / 60,
) -> None:
super().__init__()
self.deck_path = deck_path
self.watch_path = watch_path
self.show_messages = _show_messages
self.fixed_time = _fixed_time
self.fixed_triggers = _fixed_triggers
self.enable_transitions = _enable_transitions
self.slide_refresh_rate = _slide_refresh_rate
async def on_mount(self) -> None:
self.deck = load_deck(self.deck_path)
self.reloader = asyncio.create_task(self.reload())
self.install_screen(SlideScreen(), name="slide")
self.install_screen(DeckScreen(), name="deck")
self.install_screen(HelpScreen(), name="help")
await self.push_screen("slide")
async def reload(self) -> None:
if self.watch_path is None:
return
log(f"Watching {self.watch_path} for changes")
async for changes in awatch(self.watch_path):
change_msg = "\n ".join([""] + [f"{k.raw_str()}: {v}" for k, v in changes])
log(f"Reloading deck from {self.deck_path} due to detected file changes:{change_msg}")
try:
self.deck = load_deck(self.deck_path)
self.current_slide_idx = clamp(self.current_slide_idx, 0, len(self.deck))
self.set_message_temporarily(
Text(
f"Reloaded deck at {datetime.datetime.now().strftime(RELOAD_MESSAGE_TIME_FORMAT)}",
style=Style(dim=True),
),
delay=10,
)
except Exception as e:
self.set_message_temporarily(
Text(
f"Failed to reload deck at {datetime.datetime.now().strftime(RELOAD_MESSAGE_TIME_FORMAT)} due to: {e}",
style=Style(color="red"),
),
delay=10,
)
def on_resize(self, event: Resize) -> None:
self.set_message_temporarily(
message=Text(f"Screen resized to {event.size}", style=Style(dim=True)), delay=2
)
def set_message_temporarily(self, message: Text, delay: float) -> None:
if not self.show_messages:
return
self.message = message
def clear() -> None:
if self.message is message:
self.message = Text("")
self.set_timer(delay, clear)
async def action_next_slide(self) -> None:
await self.handle_new_slide(self.current_slide_idx + 1, Direction.Next)
async def action_prev_slide(self) -> None:
await self.handle_new_slide(self.current_slide_idx - 1, Direction.Previous)
async def handle_new_slide(self, new_slide_idx: int, direction: Direction) -> None:
new_slide_idx = clamp(new_slide_idx, 0, len(self.deck) - 1)
current_slide = self.deck[self.current_slide_idx]
new_slide = self.deck[new_slide_idx]
transition = new_slide.transition or self.deck.default_transition
if (
self.current_slide_idx == new_slide_idx
or not isinstance(self.screen, SlideScreen)
or transition is None
or not self.enable_transitions
):
self.current_slide_idx = new_slide_idx
return
transition_screen = SlideTransitionScreen(
from_slide=current_slide,
from_triggers=self.query_one(SlideWidget).triggers,
to_slide=new_slide,
direction=direction,
transition=transition,
)
await self.switch_screen(transition_screen)
transition_screen.animate(
"progress",
value=100,
delay=0,
duration=0.75,
on_complete=lambda: self.finalize_transition(new_slide_idx),
)
async def finalize_transition(self, new_slide_idx: int) -> None:
await self.switch_screen("slide")
self.current_slide_idx = new_slide_idx
def action_next_row(self) -> None:
self.current_slide_idx = clamp(
self.current_slide_idx + self.deck_grid_width, 0, len(self.deck) - 1
)
def action_prev_row(self) -> None:
self.current_slide_idx = clamp(
self.current_slide_idx - self.deck_grid_width, 0, len(self.deck) - 1
)
def watch_deck(self, new_deck: Deck) -> None:
self.title = new_deck.name
def watch_current_slide_idx(self, new_current_slide_idx: int) -> None:
self.query_one(SlideWidget).triggers = self.fixed_triggers or Triggers.new()
self.sub_title = self.deck[new_current_slide_idx].title
def action_trigger(self) -> None:
now = monotonic()
slide_widget = self.query_one(SlideWidget)
slide_widget.triggers = Triggers(now=now, _times=(*slide_widget.triggers._times, now))
def action_reset_trigger(self) -> None:
slide_widget = self.query_one(SlideWidget)
slide_widget.triggers = Triggers.new()
@cached_property
def repl(self) -> Callable[[], None]:
# Lazily enable readline support
try:
import readline # noqa: F401
except ImportError:
pass
self.console.clear() # clear the console the first time we go into the repl
sys.stdout.flush()
repl = code.InteractiveConsole()
return partial(repl.interact, banner="", exitmsg="")
def action_repl(self) -> None:
with self.suspend():
self.repl()
async def action_quit(self) -> None:
self.reloader.cancel()
await wait([self.reloader], timeout=1)
await super().action_quit()
@contextmanager
def suspend(self) -> Iterator[None]:
driver = self._driver
if driver is not None:
driver.stop_application_mode()
with redirect_stdout(sys.__stdout__), redirect_stderr(sys.__stderr__):
yield
driver.start_application_mode()
@property
def deck_grid_width(self) -> int:
return max(self.size.width // 35, 1)
def present(deck_path: Path | str, watch_path: Path | str | None = None) -> None:
"""
Present the deck defined in the given `deck_path`.
Args:
deck_path: The file to look for a deck in.
watch_path: When filesystem changes are detected below this path (recursively), reload the deck from the `deck_path`.
If `None` (the default), use the parent directory of the `deck_path`.
"""
os.environ["TEXTUAL"] = ",".join(sorted({"debug", "devtools"}))
deck_path = Path(deck_path).resolve()
watch_path = Path(watch_path or deck_path.parent).resolve()
SpielApp(deck_path=deck_path, watch_path=watch_path).run()