"""Per-output workspace switcher for the i3 window manager. This module powers the ``pi3-smart-workspace`` CLI. When invoked with ``-i N``, it switches to the N-th workspace declared in the user's i3 config *for the output the mouse cursor is currently on* — rather than the N-th globally-numbered workspace, which is i3's default. The flow on each invocation: 1. Connect to i3 over the IPC socket and discover which outputs are active right now (no shelling out to ``xrandr``). 2. Read the live i3 config via ``i3.get_config()`` and parse it with :func:`parse_workspaces_per_output` into ``{output: [workspace, ...]}``, in the order workspaces are declared in the config. 3. Read the current cursor position and use :func:`find_output_at_cursor` to decide which output the cursor is on. 4. Issue ``move container to workspace …`` and/or ``workspace …`` via ``i3.command(...)``. The two parsing/geometry helpers (:func:`parse_workspaces_per_output` and :func:`find_output_at_cursor`) are pure functions and live here to be unit-tested without an X display or a running i3 session. Everything that touches the outside world is confined to :func:`run`. """ from __future__ import annotations import argparse import glob import os import pathlib import pprint import re import sys from dataclasses import dataclass from typing import Iterable import i3ipc DEFAULT_INIT_DIR = pathlib.Path("~/.config/i3/pi3-smart-workspace") DEFAULT_WORKSPACES_PER_OUTPUT = 8 @dataclass(frozen=True) class Rect: """Axis-aligned rectangle in screen coordinates. Mirrors the relevant fields of an ``i3ipc`` output rect so that :func:`find_output_at_cursor` can stay free of any ``i3ipc`` dependency and be exercised by plain unit tests. Attributes: x: X coordinate of the top-left corner, in pixels. y: Y coordinate of the top-left corner, in pixels. width: Width of the rectangle, in pixels. height: Height of the rectangle, in pixels. """ x: int y: int width: int height: int @dataclass(frozen=True) class Args: """Parsed CLI arguments, as a typed alternative to ``argparse.Namespace``. Attributes: index: 1-based index into the per-output workspace list. shift: If true, move the focused container to the target workspace. keep_with_it: If true *and* ``shift`` is true, also follow the container to the new workspace. debug: If true, print parsed intermediate state to stderr and re-raise exceptions instead of producing a friendly message. """ index: int shift: bool keep_with_it: bool debug: bool _INCLUDE_RE = re.compile(r"^\s*include\s+(\S.*?)\s*$", re.MULTILINE) def expand_i3_includes( config_text: str, base_dir: pathlib.Path, _visited: set[pathlib.Path] | None = None, ) -> str: """Inline all i3 ``include`` directives in ``config_text``, recursively. The i3 IPC ``get_config`` reply returns the raw contents of the loaded config file, *not* the post-include-expanded view that i3 itself acts on. Without this expansion, any workspace/output bindings stashed in included files (which is exactly what ``pi3-smart-workspace init`` generates) would be invisible to :func:`parse_workspaces_per_output`. The expansion mirrors i3's ``wordexp(3)`` semantics for the common case: leading ``~`` is expanded to ``$HOME``, relative paths are resolved against the directory of the file containing the include, and shell-style globs (``*``, ``?``, ``[…]``) expand alphabetically. Quoted paths and environment variables aren't handled. Files that fail to read (missing, permission denied) are silently skipped — i3 itself tolerates that. Include cycles are broken by tracking already-visited resolved paths. Args: config_text: Raw i3 config text to expand. base_dir: Directory used to resolve relative include paths. _visited: Internal recursion guard. Do not pass. Returns: ``config_text`` with every ``include`` line replaced by the contents of the matched files (themselves recursively expanded). Non-include text is preserved verbatim. """ if _visited is None: _visited = set() out: list[str] = [] last = 0 for match in _INCLUDE_RE.finditer(config_text): out.append(config_text[last : match.start()]) raw_path = os.path.expanduser(match.group(1)) if not os.path.isabs(raw_path): raw_path = str(base_dir / raw_path) for matched in sorted(glob.glob(raw_path)): path = pathlib.Path(matched) real = path.resolve() if real in _visited: continue _visited.add(real) try: sub_text = path.read_text() except OSError: continue out.append(expand_i3_includes(sub_text, path.parent, _visited)) last = match.end() out.append(config_text[last:]) return "".join(out) def parse_workspaces_per_output( config_text: str, active_outputs: Iterable[str] ) -> dict[str, list[str]]: """Extract per-output workspace lists from an i3 config. The function expects the user's i3 config to follow the three-part convention documented in ``README.md``:: set $myOutput HDMI-A-0 # output alias set $ws1 1:web # workspace-name alias workspace $ws1 output $myOutput # binding Three regexes are applied to ``config_text``: 1. ``set $ `` — only matches output names found in ``active_outputs`` (the regex is anchored and the alternation is built from the escaped active names, so unplugged or unknown outputs are silently ignored). 2. ``set $ `` — workspace-name aliases. The value must start with a digit, which avoids accidentally matching unrelated ``set`` lines such as ``set $mod Mod4``. 3. ``workspace $ output $`` — bindings that join the two above. Bindings whose output variable isn't in the active map are skipped. Args: config_text: The raw i3 config text, as returned by ``i3.get_config().config``. active_outputs: Names of outputs that are currently active (from ``i3.get_outputs()`` with ``o.active == True``). Returns: A dict mapping each active output name to the list of workspace names declared for it, in the order their ``workspace … output …`` lines appear in the config. Outputs without any bindings are omitted; ``active_outputs`` being empty yields ``{}``. """ active = set(active_outputs) if not active: return {} # Build the output-name alternation from the *escaped* active names so # that regex metacharacters in output names (e.g. ``DP-1``, ``HDMI-A-0``) # don't trip up the match. output_pattern = "|".join(re.escape(name) for name in active) output_alias_re = re.compile( rf"^\s*set\s+(\$\w+)\s+({output_pattern})\s*$", re.MULTILINE ) # Workspace values can be unquoted (``1:1``) or quoted (``"1"``, # ``"1: web"``). Both forms must start with a digit, which keeps us # from accidentally matching unrelated ``set`` lines like # ``set $mod Mod4`` or ``set $term i3-sensible-terminal``. workspace_alias_re = re.compile( r'^\s*set\s+(\$\w+)\s+(?:"(\d[^"]*)"|(\d\S*))\s*$', re.MULTILINE ) binding_re = re.compile( r"^\s*workspace\s+(\$\w+)\s+output\s+(\$\w+)\s*$", re.MULTILINE ) output_aliases = {m.group(1): m.group(2) for m in output_alias_re.finditer(config_text)} # Quoted form lives in group 2 (without surrounding quotes), unquoted in # group 3. Whichever branch matched, the other group is ``None``. workspace_aliases = { m.group(1): m.group(2) if m.group(2) is not None else m.group(3) for m in workspace_alias_re.finditer(config_text) } result: dict[str, list[str]] = {} for match in binding_re.finditer(config_text): ws_var, out_var = match.group(1), match.group(2) # Either side of the binding may reference a variable we didn't # match (e.g. an unplugged output) — drop the binding silently. if out_var not in output_aliases or ws_var not in workspace_aliases: continue output = output_aliases[out_var] result.setdefault(output, []).append(workspace_aliases[ws_var]) return result def find_output_at_cursor( outputs: Iterable[tuple[str, Rect]], cursor: tuple[int, int] ) -> str | None: """Find which output's rect contains a given cursor position. Adjacent outputs in a multi-monitor setup share an edge, so a cursor that lands exactly on a shared edge would otherwise belong to two outputs at once. The rule applied here is: the lower bound on each axis is *inclusive only at coordinate 0* (the screen origin) and *exclusive* otherwise. The effect is that a cursor on a shared edge is attributed to the top/left output, never to both. Args: outputs: Iterable of ``(name, rect)`` pairs for the active outputs. cursor: ``(x, y)`` cursor position in screen coordinates. Returns: The name of the first output whose rect contains the cursor, or ``None`` if no output does (e.g. the cursor is on a region not covered by any monitor). """ x, y = cursor for name, r in outputs: x_in = (r.x <= x if r.x == 0 else r.x < x) and x <= r.x + r.width y_in = (r.y <= y if r.y == 0 else r.y < y) and y <= r.y + r.height if x_in and y_in: return name return None def _active_outputs(i3: i3ipc.Connection) -> list[tuple[str, Rect]]: """Return ``(name, rect)`` for every currently active i3 output. Inactive outputs (disconnected monitors that i3 still remembers) are filtered out. Args: i3: A connected ``i3ipc.Connection``. Returns: A list of ``(output_name, Rect)`` tuples in the order i3 returns them — typically primary-first, but this is not guaranteed. """ return [ (o.name, Rect(o.rect.x, o.rect.y, o.rect.width, o.rect.height)) for o in i3.get_outputs() if o.active ] def run(args: Args) -> None: """Execute one workspace switch / shift based on ``args``. This is the side-effectful counterpart to the pure helpers above: it talks to i3 over IPC, asks the X server for the cursor position via pynput, and issues the resulting i3 commands. Args: args: Parsed CLI arguments. Raises: RuntimeError: If no active output contains the cursor, or if ``args.index`` is outside the range of workspaces declared for the cursor's output. """ i3 = i3ipc.Connection() outputs = _active_outputs(i3) if args.debug: pprint.pprint({"active_outputs": outputs}) config_text = i3.get_config().config # i3.get_config() returns the *raw* file contents — any `include …` # directives must be expanded by us before the parser sees them, or # workspace bindings stashed in included files (the layout `init` # generates) will be invisible. loaded_path = pathlib.Path(i3.get_version().loaded_config_file_name or "") config_base = loaded_path.parent if loaded_path.name else pathlib.Path.home() / ".config/i3" config_text = expand_i3_includes(config_text, config_base) workspaces_per_output = parse_workspaces_per_output( config_text, (name for name, _ in outputs) ) if args.debug: pprint.pprint({"workspaces_per_output": workspaces_per_output}) # pynput touches the X display on import, so keep it lazy: importing # ``pi3.smart_workspace`` itself must work without ``$DISPLAY`` (CI, # unit tests, ``pip install`` post-install hooks, etc.). from pynput.mouse import Controller as MouseController cursor = MouseController().position output_name = find_output_at_cursor(outputs, (int(cursor[0]), int(cursor[1]))) if output_name is None: raise RuntimeError(f"no active output contains cursor position {cursor}") workspaces = workspaces_per_output.get(output_name, []) if not 1 <= args.index <= len(workspaces): raise RuntimeError( f"index {args.index} out of range; output {output_name!r} has " f"{len(workspaces)} workspaces declared" ) target = workspaces[args.index - 1] if args.shift: i3.command(f"move container to workspace {target}") if not args.keep_with_it: return i3.command(f"workspace {target}") # --------------------------------------------------------------------------- # init subcommand # --------------------------------------------------------------------------- _AUTOGEN_HEADER = ( "# Auto-generated by `pi3-smart-workspace init`.\n" "# Re-run the command to regenerate after plugging/unplugging monitors.\n" ) def sanitize_var_suffix(output_name: str) -> str: """Turn an i3 output name into a valid i3 variable-name suffix. i3 variables must be alphanumeric/underscore (matching ``\\$\\w+``), but output names commonly contain hyphens (``eDP-1``, ``HDMI-A-0``). All non-alphanumeric characters become underscores. Args: output_name: The output name as reported by i3 (e.g. ``"HDMI-A-0"``). Returns: A string usable as the tail of an i3 ``$var`` (e.g. ``"HDMI_A_0"``). """ return re.sub(r"[^a-zA-Z0-9]", "_", output_name) def render_workspaces_for_output( output_name: str, global_start: int, count: int ) -> str: """Render the ``workspaces.conf`` content for a single output. The output gets a contiguous block of global workspace numbers ``[global_start, global_start + count)``. The label after the colon is the *local* index (1..count) so the i3bar shows e.g. ``9:1`` for the first workspace of a secondary monitor — meaning "global #9, locally the 1st on this output". Args: output_name: Active output name from ``i3.get_outputs()``. global_start: First global workspace number to allocate. Must be >= 1. count: How many workspaces to declare for this output. Must be >= 1. Returns: The full file contents, including a leading comment header and a trailing newline. """ if global_start < 1: raise ValueError("global_start must be >= 1") if count < 1: raise ValueError("count must be >= 1") var = sanitize_var_suffix(output_name) out_var = f"$out_{var}" ws_vars = [f"$ws_{var}_{local}" for local in range(1, count + 1)] lines: list[str] = [ _AUTOGEN_HEADER, f"# Output: {output_name}\n", f"set {out_var} {output_name}\n", "\n", ] for local, ws_var in enumerate(ws_vars, start=1): global_num = global_start + local - 1 lines.append(f"set {ws_var} {global_num}:{local}\n") lines.append("\n") for ws_var in ws_vars: lines.append(f"workspace {ws_var} output {out_var}\n") return "".join(lines) def render_bindings(count: int, mod: str = "$mod") -> str: """Render the shared ``bindings.conf`` content. Generates three blocks of ``count`` keybindings each: * ``mod+N`` → switch to the N-th workspace on the cursor's output * ``mod+Shift+N`` → move focused container to that workspace * ``mod+Ctrl+N`` → move container *and* follow it The ``mod`` variable must already be defined elsewhere in the user's i3 config (i3's stock template defines ``set $mod Mod4``). Args: count: Number of bindings per block (typically 8, matching the number of workspaces per output). mod: The mod-key i3 variable to bind against. Default ``"$mod"``. Returns: The full file contents, including a leading comment header and a trailing newline. """ if count < 1: raise ValueError("count must be >= 1") def block(suffix: str, flags: str, comment: str) -> list[str]: rows = [f"# {comment}\n"] for n in range(1, count + 1): rows.append( f"bindsym {mod}+{suffix}{n} exec --no-startup-id " f"pi3-smart-workspace -i {n}{flags}\n" ) rows.append("\n") return rows out: list[str] = [_AUTOGEN_HEADER, "\n"] out.extend(block("", "", "Switch to workspace")) out.extend(block("Shift+", " -s", "Move focused container to workspace")) out.extend(block("Ctrl+", " -s -k", "Move container to workspace and follow it")) return "".join(out) @dataclass(frozen=True) class InitArgs: """Parsed CLI arguments for the ``init`` subcommand.""" count: int config_dir: pathlib.Path dry_run: bool force: bool debug: bool def _planned_files( config_dir: pathlib.Path, outputs: list[tuple[str, Rect]], count: int, ) -> list[tuple[pathlib.Path, str]]: """Compute the (path, content) pairs that ``init`` would write. Pure function: no filesystem or i3 access. Each output gets a folder ``//workspaces.conf``; the shared keybindings go in ``/bindings.conf``. Output order is preserved from ``outputs`` so global workspace numbers are deterministic. """ files: list[tuple[pathlib.Path, str]] = [ (config_dir / "bindings.conf", render_bindings(count)), ] for index, (output_name, _) in enumerate(outputs): global_start = index * count + 1 files.append( ( config_dir / output_name / "workspaces.conf", render_workspaces_for_output(output_name, global_start, count), ) ) return files def run_init(args: InitArgs) -> None: """Generate per-output i3 workspace configs for the currently-active outputs. Args: args: Parsed init-subcommand arguments. Raises: RuntimeError: If no active outputs are detected, or if files already exist and ``--force`` was not passed. """ i3 = i3ipc.Connection() outputs = _active_outputs(i3) if not outputs: raise RuntimeError("no active outputs detected by i3") config_dir = args.config_dir.expanduser() files = _planned_files(config_dir, outputs, args.count) if args.dry_run: for path, content in files: print(f"--- {path} ---") print(content, end="") return existing = [path for path, _ in files if path.exists()] if existing and not args.force: listing = "\n ".join(str(p) for p in existing) raise RuntimeError( "refusing to overwrite existing files (pass --force to replace):\n " + listing ) for path, content in files: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content) print(f"wrote {path}") print() print("Add the following to your i3 config (typically ~/.config/i3/config):") print(f" include {config_dir}/bindings.conf") print(f" include {config_dir}/*/workspaces.conf") print() print("Then reload i3 with $mod+Shift+r.") def _init_main(argv: list[str]) -> None: parser = argparse.ArgumentParser( prog="pi3-smart-workspace init", description="Generate per-output i3 workspace configs and keybindings.", ) parser.add_argument( "-n", "--count", type=int, default=DEFAULT_WORKSPACES_PER_OUTPUT, help=f"Workspaces per output (default: {DEFAULT_WORKSPACES_PER_OUTPUT}).", ) parser.add_argument( "--config-dir", type=pathlib.Path, default=DEFAULT_INIT_DIR, help=f"Where to write generated configs (default: {DEFAULT_INIT_DIR}).", ) parser.add_argument( "--dry-run", action="store_true", help="Print generated configs to stdout instead of writing them.", ) parser.add_argument( "--force", action="store_true", help="Overwrite existing files in the config directory.", ) parser.add_argument( "-d", "--debug", action="store_true", help="Re-raise exceptions instead of printing a friendly message.", ) parsed = parser.parse_args(argv) init_args = InitArgs( count=parsed.count, config_dir=parsed.config_dir, dry_run=parsed.dry_run, force=parsed.force, debug=parsed.debug, ) try: run_init(init_args) except Exception as exc: if init_args.debug: raise print(f"pi3-smart-workspace init: {exc}", file=sys.stderr) sys.exit(1) # --------------------------------------------------------------------------- # main entry point # --------------------------------------------------------------------------- def main() -> None: """Entry point for the ``pi3-smart-workspace`` console script. If invoked as ``pi3-smart-workspace init [...]`` the call is routed to :func:`_init_main`. Otherwise it parses ``sys.argv`` into an :class:`Args` instance and dispatches to :func:`run`. Exceptions are caught and printed as a single-line error on stderr (exit code 1), unless ``--debug`` is set, in which case the traceback is re-raised. """ if len(sys.argv) > 1 and sys.argv[1] == "init": _init_main(sys.argv[2:]) return parser = argparse.ArgumentParser( description="Switch (or shift) to the N-th workspace on the output your cursor is on." ) parser.add_argument( "-d", "--debug", action="store_true", help="Print parsed state and re-raise exceptions.", ) required = parser.add_argument_group("Required") required.add_argument( "-i", "--index", type=int, required=True, help="The 1-based index into the workspaces declared for the cursor's output.", ) shift_group = parser.add_argument_group("Shift", "manipulate the active window") shift_group.add_argument( "-s", "--shift", action="store_true", help="Move the focused container to the indexed workspace.", ) shift_group.add_argument( "-k", "--keep-with-it", action="store_true", help="With --shift, also follow the container to the new workspace.", ) parsed = parser.parse_args() args = Args( index=parsed.index, shift=parsed.shift, keep_with_it=parsed.keep_with_it, debug=parsed.debug, ) try: run(args) except Exception as exc: if args.debug: raise print(f"pi3-smart-workspace: {exc}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()