Source code for pyvisor.dataIO

# -*- coding: utf-8 -*-
"""
Created on Wed Jun  1 13:42:07 2016

@author: bgeurten
"""
import numpy as np
import scipy.io as sio
from time import sleep
import threading
from datetime import datetime
#import pandas as pd
import pickle, xlsxwriter, pygame,os

[docs] class dataIO: """Data input/output handler for ethogram data. Supports saving annotations as plain text, Excel, MATLAB, and pickle formats. Also handles autosave with periodic background snapshots and overlay frame/video export. """ def __init__(self,parent): self.autoSavePath = '' self.saveFpos = '' self.loadFpos = '' self.parent = parent self._autosave_thread = None self._autosave_stop_event = threading.Event() self._autosave_lock = threading.Lock() self._autosave_config = { 'enabled': False, 'interval': 300, 'output_path': '' }
[docs] def autosave(self): """Start, update, or stop the autosave worker. Autosave parameters are provided by the parent scorer via the ``autosave_settings`` attribute. The expected schema is:: { "enabled": bool, "interval_seconds": int, "directory": str, } The worker writes ``autosave_latest.pkl`` and ``autosave_latest.txt`` into the configured directory at the given interval. When autosave is disabled any existing worker is shut down gracefully. """ settings = getattr(self.parent, 'autosave_settings', None) if settings is None: return enabled = bool(settings.get('enabled', False)) interval = int(settings.get('interval_seconds', 300)) output_path = settings.get('directory', '') or '' snapshot_now = False with self._autosave_lock: self._autosave_config.update({ 'enabled': enabled, 'interval': max(1, interval), 'output_path': output_path }) if not enabled: self._stop_autosave_locked() return if not self._autosave_thread or not self._autosave_thread.is_alive(): self._autosave_stop_event.clear() self._autosave_thread = threading.Thread(target=self._autosave_worker, name='pyvisor-autosave', daemon=True) self._autosave_thread.start() else: snapshot_now = True if snapshot_now: self._write_autosave_snapshot()
[docs] def stop_autosave(self): with self._autosave_lock: self._autosave_config['enabled'] = False self._stop_autosave_locked()
def _stop_autosave_locked(self): if self._autosave_thread and self._autosave_thread.is_alive(): self._autosave_stop_event.set() self._autosave_thread.join(timeout=1.0) self._autosave_thread = None self._autosave_stop_event = threading.Event() def _autosave_worker(self): # Perform an immediate snapshot followed by periodic updates. while True: try: self._write_autosave_snapshot() except Exception as exc: # pragma: no cover - log and continue print('Autosave failed: {}'.format(exc)) interval = self._autosave_config.get('interval', 300) if self._autosave_stop_event.wait(interval): break def _write_autosave_snapshot(self): with self._autosave_lock: if not self._autosave_config.get('enabled', False): return directory = self._autosave_config.get('output_path') if not directory: return os.makedirs(directory, exist_ok=True) data = self.parent.get_data() if data is False or data is None: return labels = [] if hasattr(self.parent, 'get_labels'): labels = self.parent.get_labels() or [] timestamp = datetime.utcnow().strftime('%Y%m%d-%H%M%S') base_path = os.path.join(directory, 'autosave_latest') timestamp_base = os.path.join(directory, f'autosave_{timestamp}') self.saveAsPy(base_path + '.pkl', data) self.saveAsTXT(base_path + '.txt', data, labels) # Keep a timestamped copy to make it easy to revisit older states. self.saveAsPy(timestamp_base + '.pkl', data)
[docs] def saveAsTXT(self,fpos,data,behavLabels): headStr = '' for i in range(len(behavLabels)): headStr = headStr + 'COL' + str(i+1) + ': '+ str(behavLabels[i]) + '\n' np.savetxt(fpos,data,fmt='%2i',header= headStr)
[docs] def saveAsMat(self,fpos,data,behavLabels): info = '' for i in range(len(behavLabels)): info = info + 'COL' + str(i+1) + ': ' + str(behavLabels[i]) + '\n' sio.savemat(fpos, {'data':data,'info':info})
[docs] def saveAsXLSX(self,fpos,data,behavLabels): # Create a workbook and add a worksheet. workbook = xlsxwriter.Workbook(fpos) worksheet = workbook.add_worksheet() # Start from the first cell. Rows and columns are zero indexed. row = 0 col = 0 # Iterate over the data and write it out row by row. for label in (behavLabels): worksheet.write(row, col, label) col += 1 for rowI in range(data.shape[0]): for colI in range( data.shape[1]): worksheet.write(rowI+1,colI,data[rowI,colI]) workbook.close()
[docs] def saveOverlayMovie(self, fPos, prefix='frame', extension='mp4'): """Export the scored video as an actual video file with icon overlays. If *extension* is 'mp4' or 'avi', a proper video file is written using PyAV. For image formats ('png', 'jpeg', 'bmp', 'tga') a numbered image sequence is written instead. """ scorer = self.parent if not hasattr(scorer, 'screen') or scorer.screen is None: print("saveOverlayMovie: scorer screen not available") return movLen = scorer.movie.length fPos = str(fPos) prefix = str(prefix) extension = str(extension) is_video = extension.lower() in ('mp4', 'avi', 'mkv', 'mov') if is_video: self._export_as_video(scorer, fPos, prefix, extension, movLen) else: self._export_as_image_sequence(scorer, fPos, prefix, extension, movLen)
def _export_as_video(self, scorer, dPos, prefix, extension, movLen): """Write an mp4/avi video with overlays using PyAV.""" try: import av as _av except ImportError: print("PyAV is required for video export. Install with: pip install av") return out_path = os.path.join(dPos, "{}.{}".format(prefix, extension)) width = scorer.screen.get_width() height = scorer.screen.get_height() fps = int(scorer.movie._movie_fps) container = _av.open(out_path, mode='w') stream = container.add_stream('h264', rate=fps) stream.width = width stream.height = height stream.pix_fmt = 'yuv420p' for i in range(movLen - 1): surface = self._render_frame(scorer, i) # Convert pygame surface → numpy array (H, W, 3) arr = pygame.surfarray.array3d(surface) # pygame gives (W, H, 3) transposed — fix it arr = np.transpose(arr, (1, 0, 2)) frame = _av.VideoFrame.from_ndarray(arr, format='rgb24') for packet in stream.encode(frame): container.mux(packet) if i % 100 == 0: print("Exporting frame {}/{}".format(i, movLen)) # Flush for packet in stream.encode(): container.mux(packet) container.close() print("Video exported: {}".format(out_path)) def _export_as_image_sequence(self, scorer, dPos, prefix, extension, movLen): """Write numbered image files with overlays.""" digitNum = len(str(movLen)) for i in range(movLen - 1): fName = '{}_{}.{}'.format(prefix, str(i).zfill(digitNum), extension) fPath = os.path.join(dPos, fName) surface = self._render_frame(scorer, i) pygame.image.save(surface, str(fPath)) if i % 100 == 0: print("Exporting frame {}/{}".format(i, movLen)) print("Image sequence exported to: {}".format(dPos))
[docs] def saveOverlayImage(self, fPos, targetFrame=37): """Save a single frame with behaviour icon overlays.""" scorer = self.parent if not hasattr(scorer, 'screen') or scorer.screen is None: print("saveOverlayImage: scorer screen not available") return surface = self._render_frame(scorer, targetFrame) pygame.image.save(surface, str(fPos)) print("Frame {} saved to: {}".format(targetFrame, fPos))
@staticmethod def _render_frame(scorer, frame_number): """Render a single frame with overlays to a new surface.""" from PIL import Image as _Image # Get the raw video frame raw = scorer.movie.getFrame(frame_number) raw = _Image.fromarray(raw).convert('RGB') movie_screen = pygame.surfarray.make_surface(np.rot90(raw)) # Draw onto the scorer's screen scorer.screen.fill((0, 0, 0)) scorer.screen.blit(movie_screen, (scorer.movie_window_offset, 144)) # Draw icons for this frame if scorer.ethogram is not None: with scorer.ethogram.lock: scorer.ethogram.apply_states_at_frame(frame_number) scorer._update_icons() # Draw frame info text myfont = pygame.font.SysFont(pygame.font.get_default_font(), 15) label = myfont.render("frame: " + str(frame_number), 1, (255, 255, 0)) label2 = myfont.render("time: {:.1f} s".format( frame_number / scorer.movie._movie_fps), 1, (255, 255, 0)) scorer.screen.blit(label, (scorer.movie_window_offset + 10, scorer.movie.height - 45 + 144)) scorer.screen.blit(label2, (scorer.movie_window_offset + 10, scorer.movie.height - 30 + 144)) # Return a copy of the surface return scorer.screen.copy()
[docs] def saveAsPy(self,fpos,data): with open(fpos, 'wb') as handle: pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)
[docs] def loadTXT(self,fpos,animal1,animal2): # data = np.loadtxt(fpos) # animal1,animal2 = self.assignMatrix2animals(data,animal1,animal2) # return animal1,animal2 pass
[docs] def loadMAT(self,fpos,animal1,animal2): # matData = sio.loadmat(fpos) # animal1,animal2 = self.assignMatrix2animals(matData['data'],animal1,animal2) # return animal1,animal2 pass
[docs] def loadPickle(self,fpos,animals): with open(fpos, 'rb') as handle: data = pickle.load(handle) animals = self.assignMatrix2animals(data,animals) return animals
[docs] def loadXLSX(self,fpos,animal1,animal2): # df = pd.read_excel('data.xlsx', sheetname='Sheet1') # animal1,animal2 = self.assignMatrix2animals(df.values,animal1,animal2) # return animal1,animal2 pass
[docs] def assignMatrix2animals(self,data,animals): colCounter = 0 for animalI in range(len(animals)): for behavI in range(len(animals[animalI].behaviours)): animals[animalI].behaviours[behavI].ethogram = data[:,colCounter] colCounter +=1 return animals