Source code for pyvisor.manual_ethology_scorer_2

import os
from typing import Dict, List, Tuple, Union
import threading

import numpy as np
import pygame
from PIL import Image

from .GUI.model.animal import AnimalNumber, Animal
from .GUI.model.movie_bindings import MovieBindings
from .MediaHandler import MediaHandler
from .ethogram import Ethogram
from .user_input_control import UserInputControl2
from .resources import resource_path
from . import dataIO
from .paths import ensure_autosave_dir

this_files_directory = os.path.dirname(os.path.realpath(__file__))


[docs] class ManualEthologyScorer2: """Real-time video annotation engine. Opens a pygame window showing the video with behaviour icon overlays. Processes gamepad/keyboard input to toggle behaviour states and records them frame-by-frame into an :class:`Ethogram`. Supports: - Sidecar-based session persistence (auto-save/resume) - Autosave to a configurable directory - F1-toggleable key binding overlay - Visual distinction between live and recorded annotations """ def __init__(self, animals: Dict[AnimalNumber, Animal], movie_bindings: MovieBindings, selected_device: str, autosave_settings: Union[Dict[str, Union[bool, int, str]], None] = None): self.animals = animals self.movie_bindings = movie_bindings self.selected_device = selected_device self._icon_columns = [0, 96, 192, 288, 384, 480] self._icon_rows = [] # type: List[int] self._icon_positions = { an: [] for an in self.animals } # type: Dict[AnimalNumber, List[Tuple[int, int]]] self.window = pygame.display self._delete_icon = None self.ethogram = None self.movie = None # type: Union[None, MediaHandler] self.user_input_control = None # type: Union[None, UserInputControl2] self._autosave_settings = { 'enabled': True, 'interval_seconds': 300, 'directory': '' } if autosave_settings: self.autosave_settings = autosave_settings else: self.autosave_settings = {} self._ethogram_lock = threading.RLock() self.dio = dataIO.dataIO(self)
[docs] def go(self): if self.movie is None: raise RuntimeError("Movie has to be loaded before scorer can be run!") # Re-initialize pygame fully (needed for second+ runs after pygame.quit) pygame.init() pygame.joystick.init() for i in range(pygame.joystick.get_count()): joy = pygame.joystick.Joystick(i) joy.init() pygame.time.Clock() # setup icons icon = self.image2surf(str(resource_path("gamethogram_64.png"))) self._delete_icon = self.image2surf(str(resource_path("icons", "game", "del.png"))) self.window.set_icon(icon) self.window.set_caption("GameThogram — " + self.movie.fileName) self._adjust_window_size() self.movie.activeFrame = -1 self._show_overlay = True # show key binding overlay on start self.dio.autosave() analysing = True while analysing: analysing = self._loop() self.dio.stop_autosave() self.save_sidecar() # persist session for seamless resume pygame.quit()
def _loop(self) -> bool: for event in pygame.event.get(): if event.type == pygame.QUIT: return False if event.type == pygame.JOYBUTTONDOWN: input_code = 'B' + str(event.button) self.user_input_control.handle_input(input_code) if event.type == pygame.JOYAXISMOTION: self._handle_event_joyaxismotion(event) if event.type == pygame.JOYHATMOTION: self._handle_event_joyhatmotion(event) if event.type == pygame.KEYDOWN: if event.key == pygame.K_F1: self._show_overlay = not self._show_overlay else: input_code = event.unicode self.user_input_control.handle_input(input_code) self.refresh_media() with self.ethogram.lock: self.ethogram.apply_states_at_frame(self.movie.frameNo) return True def _handle_event_joyhatmotion(self, event): value = event.dict['value'] input_code = 'H' + str(value[0]) + str(value[1]) if input_code == 'H00': return self.user_input_control.handle_input(input_code) def _handle_event_joyaxismotion(self, event): value = event.dict['value'] axis = event.dict['axis'] input_code = 'A' + str(axis) try: if value < -0.3: input_code = input_code + '-' self.user_input_control.handle_input(input_code) elif value > 0.3: input_code += '+' self.user_input_control.handle_input(input_code) except KeyError: pass
[docs] def load_movie(self, filename: str, media_type: str): pygame.init() # Normalise GUI labels to internal media type keys _type_map = { 'movie': 'movie', 'norpix': 'norpix', 'image': 'image', 'imagesequence': 'image', } media_type = _type_map.get(media_type.lower(), media_type.lower()) if media_type not in ('movie', 'norpix', 'image'): raise KeyError("media_type '{}' is not supported.".format(media_type)) self.movie = MediaHandler(filename, media_type) self.ethogram = Ethogram(self.animals, self.movie.length, self._ethogram_lock) self.user_input_control = UserInputControl2(self.animals, self.movie_bindings, self.selected_device, self.movie, self.ethogram) # Try to restore previous session from sidecar self._load_sidecar()
# ── Sidecar persistence ───────────────────────────────────── def _sidecar_path(self) -> str: """Return the sidecar file path: <video_path>.pyvisor.pkl""" if self.movie is None: return "" return self.movie.fileName + ".pyvisor.pkl"
[docs] def save_sidecar(self): """Persist the current ethogram to a sidecar file next to the video.""" import pickle as _pickle path = self._sidecar_path() if not path: return data = self.get_data() labels = self.get_labels() if data is False or data is None: return try: with open(path, 'wb') as fh: _pickle.dump({ 'data': data, 'labels': labels, 'n_frames': self.movie.length, 'fps': self.movie._movie_fps, 'media_file': self.movie.fileName, }, fh, protocol=_pickle.HIGHEST_PROTOCOL) print("Sidecar saved: {}".format(path)) except Exception as exc: print("Failed to save sidecar: {}".format(exc))
def _load_sidecar(self): """Load a previous session's ethogram from the sidecar file.""" import pickle as _pickle path = self._sidecar_path() if not path: return try: with open(path, 'rb') as fh: session = _pickle.load(fh) except FileNotFoundError: return except Exception as exc: print("Could not load sidecar: {}".format(exc)) return saved_data = session.get('data') saved_labels = session.get('labels', []) if saved_data is None or len(saved_labels) == 0: return # Check frame count matches n_frames_saved = saved_data.shape[0] if n_frames_saved != self.movie.length: print("Sidecar frame count mismatch ({} vs {}), skipping.".format( n_frames_saved, self.movie.length)) return # Build a label → column index map from the saved data # Saved labels are like "animal_0 : aggression" # We need to map to ethogram column labels like "A0_aggression" saved_col_map = {} for i, lbl in enumerate(saved_labels): saved_col_map[lbl] = i with self.ethogram.lock: for an in sorted(self.ethogram.animal_ethograms.keys()): etho = self.ethogram.animal_ethograms[an] animal = self.animals[an] for col_name in etho._table.columns: if col_name.endswith('_delete'): continue # Reconstruct the formatted label to match saved labels behav = animal.behaviours.get(col_name) if behav is None: continue formatted = "{} : {}".format(animal.name, behav.name) if formatted in saved_col_map: src_col = saved_col_map[formatted] etho._table[col_name] = saved_data[:, src_col].astype(bool) print("Restored previous session from sidecar: {}".format(path)) def _adjust_window_size(self): height = self.movie.height width = self.movie.width if width < 576: self.movie_window_offset = int((576 - width) / 2.0) width = 576 else: self.movie_window_offset = 0 height = height + 288 self.screen = self.window.set_mode((int(width), int(height))) self._define_icon_positions() self._try_auto_assign_icon_positions()
[docs] @staticmethod def image2surf(fPos): img = Image.open(fPos).convert('RGBA') mode = img.mode size = img.size data = img.tobytes() return pygame.image.frombytes(data, size, mode)
def _define_icon_positions(self): self._icon_rows = [0, 96, self.screen.get_height() - 96, self.screen.get_height() - 144] def _try_auto_assign_icon_positions(self): if len(self.animals) > 4: raise RuntimeError("Automatically assigning Icon positions currently works only" " for up to 4 animals.") for i, an in enumerate(self.animals.keys()): ypos = self._icon_rows[i] for xpos in self._icon_columns: self._icon_positions[an].append((ypos, xpos))
[docs] def refresh_media(self): frame = self.movie.get_frame() movie_screen = pygame.surfarray.make_surface(np.rot90(frame)) self.screen.fill((0, 0, 0)) self.screen.blit(movie_screen, (self.movie_window_offset, 144)) self._update_icons() self._update_text() if self._show_overlay: self._draw_bindings_overlay() pygame.display.update()
def _update_icons(self): with self.ethogram.lock: for an in self.animals: self._update_icons_of_animal(an) def _update_icons_of_animal(self, an: int): animal_etho = self.ethogram.animal_ethograms[an] live_state = set(self.ethogram.current_states[an]) recorded = set(animal_etho.get_active_labels_at_frame(self.movie.frameNo)) # Check for delete in the live state delete_label = 'A{}_delete'.format(an) if delete_label in live_state: pos = self._icon_positions[an][0] if self._icon_positions[an] else (0, 0) self.screen.blit(self._delete_icon, pos) return # Collect all labels to display, tracking which are live vs recorded-only all_labels = [] for lbl in recorded: if not lbl.endswith('_delete'): all_labels.append(lbl) for lbl in live_state: if not lbl.endswith('_delete') and lbl not in all_labels: all_labels.append(lbl) if not all_labels: return icons = animal_etho.get_icons(all_labels) positions = self._icon_positions[an][:len(icons)] for icon_surf, pos, label in zip(icons, positions, all_labels): is_live = label in live_state is_recorded = label in recorded if is_live and is_recorded: # Both: full icon + golden ring self.screen.blit(icon_surf, pos) self._draw_golden_ring(pos, icon_surf.get_size()) elif is_live: # Active only: full icon + golden ring self.screen.blit(icon_surf, pos) self._draw_golden_ring(pos, icon_surf.get_size()) else: # Recorded only: semi-transparent tmp = icon_surf.copy() tmp.set_alpha(100) self.screen.blit(tmp, pos) def _draw_golden_ring(self, pos, size): """Draw a golden border rectangle around an icon.""" rect = pygame.Rect(pos[0], pos[1], size[0], size[1]) pygame.draw.rect(self.screen, (255, 215, 0), rect, 3) def _update_text(self): myfont = pygame.font.SysFont(pygame.font.get_default_font(), 15) label = myfont.render("frame: " + str(self.movie.frameNo), 1, (255, 255, 0)) label2 = myfont.render("time: " + str(self.movie.get_time()) + ' s', 1, (255, 255, 0)) label3 = myfont.render("replay-fps: " + str(self.movie.fps), 1, (255, 255, 0)) self.screen.blit(label, (self.movie_window_offset + 10, self.movie.height - 45 + 144)) self.screen.blit(label2, (self.movie_window_offset + 10, self.movie.height - 30 + 144)) self.screen.blit(label3, (self.movie_window_offset + 10, self.movie.height - 15 + 144)) def _draw_bindings_overlay(self): """Draw a semi-transparent overlay listing all current key bindings. Toggle with F1. """ font = pygame.font.SysFont(pygame.font.get_default_font(), 14) line_h = 18 pad = 10 lines = [] # Behaviour bindings per animal for an in sorted(self.animals.keys()): animal = self.animals[an] lines.append(("--- {} (A{}) ---".format(animal.name, an), (200, 200, 255))) for label in sorted(animal.behaviours.keys()): behav = animal.behaviours[label] binding = behav.key_bindings[self.selected_device] btn_str = binding if binding else "?" lines.append((" {} = {}".format(btn_str, behav.name), (255, 255, 255))) # Movie bindings lines.append(("--- movie controls ---", (200, 200, 255))) for action_name in sorted(self.movie_bindings.keys()): action = self.movie_bindings[action_name] binding = action.key_bindings[self.selected_device] btn_str = binding if binding else "?" lines.append((" {} = {}".format(btn_str, action_name), (255, 255, 200))) lines.append(("", (0, 0, 0))) lines.append((" [F1] toggle this overlay", (180, 180, 180))) # Draw semi-transparent background overlay_w = 280 overlay_h = len(lines) * line_h + pad * 2 overlay_surface = pygame.Surface((overlay_w, overlay_h)) overlay_surface.set_alpha(180) overlay_surface.fill((20, 20, 40)) x = self.screen.get_width() - overlay_w - 10 y = 150 self.screen.blit(overlay_surface, (x, y)) # Render text lines for i, (text, color) in enumerate(lines): rendered = font.render(text, True, color) self.screen.blit(rendered, (x + pad, y + pad + i * line_h)) @property def autosave_settings(self): return self._autosave_settings @autosave_settings.setter def autosave_settings(self, settings: Dict[str, Union[bool, int, str]]): self._autosave_settings.update(settings) self._autosave_settings['interval_seconds'] = int( max(1, int(self._autosave_settings.get('interval_seconds', 300))) ) if not self._autosave_settings.get('directory'): self._autosave_settings['directory'] = str(ensure_autosave_dir())
[docs] def get_data(self): if self.ethogram is None: return False with self.ethogram.lock: matrices = [] for an in sorted(self.ethogram.animal_ethograms.keys()): etho = self.ethogram.animal_ethograms[an] matrix = etho.to_numpy() labels = [col for col in etho._table.columns] if matrix.size == 0: continue # Filter out delete columns keep = [i for i, lbl in enumerate(labels) if not lbl.endswith('_delete')] if keep: matrices.append(matrix[:, keep]) if not matrices: return False if len(matrices) == 1: return matrices[0] return np.hstack(matrices)
[docs] def get_labels(self) -> List[str]: if self.ethogram is None: return [] labels: List[str] = [] with self.ethogram.lock: for an in sorted(self.ethogram.animal_ethograms.keys()): all_labels = self.ethogram.animal_ethograms[an].get_formatted_behaviour_labels() all_cols = list(self.ethogram.animal_ethograms[an]._table.columns) for col, lbl in zip(all_cols, all_labels): if not col.endswith('_delete'): labels.append(lbl) return labels
[docs] def save_data(self, fpos, mode='text'): data = self.get_data() if data is False or data is None: return behav_labels = self.get_labels() if mode == 'text': self.dio.saveAsTXT(fpos, data, behav_labels) elif mode == 'xlsx': self.dio.saveAsXLSX(fpos, data, behav_labels) elif mode == 'matLab': self.dio.saveAsMat(fpos, data, behav_labels) elif mode == 'pickle': self.dio.saveAsPy(fpos, data) else: raise KeyError("Unknown mode for save_data: {}".format(mode))