536 lines
19 KiB
Python
536 lines
19 KiB
Python
"""Main window for the microscopy object detection application."""
|
|
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
from PySide6.QtWidgets import (
|
|
QMainWindow,
|
|
QTabWidget,
|
|
QMenuBar,
|
|
QMenu,
|
|
QStatusBar,
|
|
QMessageBox,
|
|
QWidget,
|
|
QVBoxLayout,
|
|
QLabel,
|
|
)
|
|
from PySide6.QtCore import Qt, QTimer, QSettings
|
|
from PySide6.QtGui import QAction, QKeySequence
|
|
|
|
from src.database.db_manager import DatabaseManager
|
|
from src.utils.config_manager import ConfigManager
|
|
from src.utils.logger import get_logger
|
|
from src.gui.dialogs.config_dialog import ConfigDialog
|
|
from src.gui.dialogs.delete_model_dialog import DeleteModelDialog
|
|
from src.gui.tabs.detection_tab import DetectionTab
|
|
from src.gui.tabs.training_tab import TrainingTab
|
|
from src.gui.tabs.validation_tab import ValidationTab
|
|
from src.gui.tabs.results_tab import ResultsTab
|
|
from src.gui.tabs.annotation_tab import AnnotationTab
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class MainWindow(QMainWindow):
|
|
"""Main application window."""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
# Initialize managers
|
|
self.config_manager = ConfigManager()
|
|
|
|
db_path = self.config_manager.get_database_path()
|
|
self.db_manager = DatabaseManager(db_path)
|
|
|
|
logger.info("Main window initializing")
|
|
|
|
# Setup UI
|
|
self.setWindowTitle("Microscopy Object Detection")
|
|
self.setMinimumSize(1200, 800)
|
|
|
|
self._create_menu_bar()
|
|
self._create_tab_widget()
|
|
self._create_status_bar()
|
|
|
|
# Restore window geometry or center window on screen
|
|
self._restore_window_state()
|
|
|
|
logger.info("Main window initialized")
|
|
|
|
def _create_menu_bar(self):
|
|
"""Create application menu bar."""
|
|
menubar = self.menuBar()
|
|
|
|
# File menu
|
|
file_menu = menubar.addMenu("&File")
|
|
|
|
settings_action = QAction("&Settings", self)
|
|
settings_action.setShortcut(QKeySequence("Ctrl+,"))
|
|
settings_action.triggered.connect(self._show_settings)
|
|
file_menu.addAction(settings_action)
|
|
|
|
file_menu.addSeparator()
|
|
|
|
exit_action = QAction("E&xit", self)
|
|
exit_action.setShortcut(QKeySequence("Ctrl+Q"))
|
|
exit_action.triggered.connect(self.close)
|
|
file_menu.addAction(exit_action)
|
|
|
|
# View menu
|
|
view_menu = menubar.addMenu("&View")
|
|
|
|
refresh_action = QAction("&Refresh", self)
|
|
refresh_action.setShortcut(QKeySequence("F5"))
|
|
refresh_action.triggered.connect(self._refresh_current_tab)
|
|
view_menu.addAction(refresh_action)
|
|
|
|
# Tools menu
|
|
tools_menu = menubar.addMenu("&Tools")
|
|
|
|
db_stats_action = QAction("Database &Statistics", self)
|
|
db_stats_action.triggered.connect(self._show_database_stats)
|
|
tools_menu.addAction(db_stats_action)
|
|
|
|
tools_menu.addSeparator()
|
|
|
|
delete_model_action = QAction("Delete &Model…", self)
|
|
delete_model_action.triggered.connect(self._show_delete_model_dialog)
|
|
tools_menu.addAction(delete_model_action)
|
|
|
|
# Help menu
|
|
help_menu = menubar.addMenu("&Help")
|
|
|
|
about_action = QAction("&About", self)
|
|
about_action.triggered.connect(self._show_about)
|
|
help_menu.addAction(about_action)
|
|
|
|
docs_action = QAction("&Documentation", self)
|
|
docs_action.triggered.connect(self._show_documentation)
|
|
help_menu.addAction(docs_action)
|
|
|
|
def _create_tab_widget(self):
|
|
"""Create main tab widget with all tabs."""
|
|
self.tab_widget = QTabWidget()
|
|
self.tab_widget.setTabPosition(QTabWidget.North)
|
|
|
|
# Create tabs
|
|
try:
|
|
self.detection_tab = DetectionTab(self.db_manager, self.config_manager)
|
|
self.training_tab = TrainingTab(self.db_manager, self.config_manager)
|
|
self.validation_tab = ValidationTab(self.db_manager, self.config_manager)
|
|
self.results_tab = ResultsTab(self.db_manager, self.config_manager)
|
|
self.annotation_tab = AnnotationTab(self.db_manager, self.config_manager)
|
|
|
|
# Add tabs to widget
|
|
self.tab_widget.addTab(self.detection_tab, "Detection")
|
|
self.tab_widget.addTab(self.results_tab, "Results")
|
|
self.tab_widget.addTab(self.annotation_tab, "Annotation")
|
|
self.tab_widget.addTab(self.training_tab, "Training")
|
|
self.tab_widget.addTab(self.validation_tab, "Validation")
|
|
|
|
# Connect tab change signal
|
|
self.tab_widget.currentChanged.connect(self._on_tab_changed)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating tabs: {e}")
|
|
# Create placeholder
|
|
placeholder = QWidget()
|
|
layout = QVBoxLayout()
|
|
layout.addWidget(QLabel(f"Error creating tabs: {e}"))
|
|
placeholder.setLayout(layout)
|
|
self.tab_widget.addTab(placeholder, "Error")
|
|
|
|
self.setCentralWidget(self.tab_widget)
|
|
|
|
def _create_status_bar(self):
|
|
"""Create status bar."""
|
|
self.status_bar = QStatusBar()
|
|
self.setStatusBar(self.status_bar)
|
|
|
|
# Add permanent widgets to status bar
|
|
self.status_label = QLabel("Ready")
|
|
self.status_bar.addWidget(self.status_label)
|
|
|
|
# Initial status message
|
|
self._update_status("Ready")
|
|
|
|
def _center_window(self):
|
|
"""Center window on screen."""
|
|
screen = self.screen().geometry()
|
|
size = self.geometry()
|
|
self.move((screen.width() - size.width()) // 2, (screen.height() - size.height()) // 2)
|
|
|
|
def _restore_window_state(self):
|
|
"""Restore window geometry from settings or center window."""
|
|
settings = QSettings("microscopy_app", "object_detection")
|
|
geometry = settings.value("main_window/geometry")
|
|
|
|
if geometry:
|
|
self.restoreGeometry(geometry)
|
|
logger.debug("Restored window geometry from settings")
|
|
else:
|
|
self._center_window()
|
|
logger.debug("Centered window on screen")
|
|
|
|
def _save_window_state(self):
|
|
"""Save window geometry to settings."""
|
|
settings = QSettings("microscopy_app", "object_detection")
|
|
settings.setValue("main_window/geometry", self.saveGeometry())
|
|
logger.debug("Saved window geometry to settings")
|
|
|
|
def _show_settings(self):
|
|
"""Show settings dialog."""
|
|
logger.info("Opening settings dialog")
|
|
dialog = ConfigDialog(self.config_manager, self)
|
|
if dialog.exec():
|
|
self._apply_settings()
|
|
self._update_status("Settings saved")
|
|
|
|
def _apply_settings(self):
|
|
"""Apply changed settings."""
|
|
logger.info("Applying settings changes")
|
|
# Reload configuration in all tabs if needed
|
|
try:
|
|
if hasattr(self, "detection_tab"):
|
|
self.detection_tab.refresh()
|
|
if hasattr(self, "training_tab"):
|
|
self.training_tab.refresh()
|
|
if hasattr(self, "results_tab"):
|
|
self.results_tab.refresh()
|
|
except Exception as e:
|
|
logger.error(f"Error applying settings: {e}")
|
|
|
|
def _refresh_current_tab(self):
|
|
"""Refresh the current tab."""
|
|
current_widget = self.tab_widget.currentWidget()
|
|
if hasattr(current_widget, "refresh"):
|
|
current_widget.refresh()
|
|
self._update_status("Tab refreshed")
|
|
|
|
def _on_tab_changed(self, index: int):
|
|
"""Handle tab change event."""
|
|
tab_name = self.tab_widget.tabText(index)
|
|
logger.debug(f"Switched to tab: {tab_name}")
|
|
self._update_status(f"Viewing: {tab_name}")
|
|
|
|
def _show_database_stats(self):
|
|
"""Show database statistics dialog."""
|
|
try:
|
|
stats = self.db_manager.get_detection_statistics()
|
|
|
|
message = f"""
|
|
<h3>Database Statistics</h3>
|
|
<p><b>Total Detections:</b> {stats.get('total_detections', 0)}</p>
|
|
<p><b>Average Confidence:</b> {stats.get('average_confidence', 0):.2%}</p>
|
|
<p><b>Classes:</b></p>
|
|
<ul>
|
|
"""
|
|
|
|
for class_name, count in stats.get("class_counts", {}).items():
|
|
message += f"<li>{class_name}: {count}</li>"
|
|
|
|
message += "</ul>"
|
|
|
|
QMessageBox.information(self, "Database Statistics", message)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting database stats: {e}")
|
|
QMessageBox.warning(self, "Error", f"Failed to get database statistics:\n{str(e)}")
|
|
|
|
def _show_delete_model_dialog(self) -> None:
|
|
"""Open the model deletion dialog."""
|
|
dialog = DeleteModelDialog(self.db_manager, self)
|
|
if not dialog.exec():
|
|
return
|
|
|
|
model_ids = dialog.selected_model_ids
|
|
if not model_ids:
|
|
return
|
|
|
|
self._delete_models(model_ids)
|
|
|
|
def _delete_models(self, model_ids: list[int]) -> None:
|
|
"""Delete one or more models from the database and remove artifacts from disk."""
|
|
|
|
deleted_count = 0
|
|
removed_paths: list[str] = []
|
|
remove_errors: list[str] = []
|
|
|
|
for model_id in model_ids:
|
|
model = None
|
|
try:
|
|
model = self.db_manager.get_model_by_id(int(model_id))
|
|
except Exception as exc:
|
|
logger.error(f"Failed to load model {model_id} before deletion: {exc}")
|
|
|
|
if not model:
|
|
remove_errors.append(f"Model id {model_id} not found in database.")
|
|
continue
|
|
|
|
try:
|
|
deleted = self.db_manager.delete_model(int(model_id))
|
|
except Exception as exc:
|
|
logger.error(f"Failed to delete model {model_id}: {exc}")
|
|
remove_errors.append(f"Failed to delete model id {model_id} from DB: {exc}")
|
|
continue
|
|
|
|
if not deleted:
|
|
remove_errors.append(f"Model id {model_id} was not deleted (already removed?).")
|
|
continue
|
|
|
|
deleted_count += 1
|
|
removed, errors = self._delete_model_artifacts_from_disk(model)
|
|
removed_paths.extend(removed)
|
|
remove_errors.extend(errors)
|
|
|
|
# Refresh tabs to reflect the deletion(s).
|
|
try:
|
|
if hasattr(self, "detection_tab"):
|
|
self.detection_tab.refresh()
|
|
if hasattr(self, "results_tab"):
|
|
self.results_tab.refresh()
|
|
if hasattr(self, "validation_tab"):
|
|
self.validation_tab.refresh()
|
|
if hasattr(self, "training_tab"):
|
|
self.training_tab.refresh()
|
|
except Exception as exc:
|
|
logger.warning(f"Failed to refresh tabs after model deletion: {exc}")
|
|
|
|
details: list[str] = []
|
|
if removed_paths:
|
|
details.append("Removed from disk:\n" + "\n".join(removed_paths))
|
|
if remove_errors:
|
|
details.append("\nDisk cleanup warnings:\n" + "\n".join(remove_errors))
|
|
|
|
QMessageBox.information(
|
|
self,
|
|
"Delete Model",
|
|
f"Deleted {deleted_count} model(s) from database." + ("\n\n" + "\n".join(details) if details else ""),
|
|
)
|
|
|
|
def _delete_model(self, model_id: int) -> None:
|
|
"""Delete a model from the database and remove its artifacts from disk."""
|
|
|
|
model = None
|
|
try:
|
|
model = self.db_manager.get_model_by_id(model_id)
|
|
except Exception as exc:
|
|
logger.error(f"Failed to load model {model_id} before deletion: {exc}")
|
|
|
|
if not model:
|
|
QMessageBox.warning(self, "Delete Model", "Selected model was not found in the database.")
|
|
return
|
|
|
|
model_path = str(model.get("model_path") or "")
|
|
|
|
try:
|
|
deleted = self.db_manager.delete_model(model_id)
|
|
except Exception as exc:
|
|
logger.error(f"Failed to delete model {model_id}: {exc}")
|
|
QMessageBox.critical(self, "Delete Model", f"Failed to delete model from database:\n{exc}")
|
|
return
|
|
|
|
if not deleted:
|
|
QMessageBox.warning(self, "Delete Model", "No model was deleted (it may have already been removed).")
|
|
return
|
|
|
|
removed_paths, remove_errors = self._delete_model_artifacts_from_disk(model)
|
|
|
|
# Refresh tabs to reflect the deletion.
|
|
try:
|
|
if hasattr(self, "detection_tab"):
|
|
self.detection_tab.refresh()
|
|
if hasattr(self, "results_tab"):
|
|
self.results_tab.refresh()
|
|
if hasattr(self, "validation_tab"):
|
|
self.validation_tab.refresh()
|
|
if hasattr(self, "training_tab"):
|
|
self.training_tab.refresh()
|
|
except Exception as exc:
|
|
logger.warning(f"Failed to refresh tabs after model deletion: {exc}")
|
|
|
|
details = []
|
|
if model_path:
|
|
details.append(f"Deleted model record for: {model_path}")
|
|
if removed_paths:
|
|
details.append("\nRemoved from disk:\n" + "\n".join(removed_paths))
|
|
if remove_errors:
|
|
details.append("\nDisk cleanup warnings:\n" + "\n".join(remove_errors))
|
|
|
|
QMessageBox.information(
|
|
self,
|
|
"Delete Model",
|
|
"Model deleted from database." + ("\n\n" + "\n".join(details) if details else ""),
|
|
)
|
|
|
|
def _delete_model_artifacts_from_disk(self, model: dict) -> tuple[list[str], list[str]]:
|
|
"""Best-effort removal of model artifacts on disk.
|
|
|
|
Strategy:
|
|
- Remove run directories inferred from:
|
|
- model.model_path (…/<run>/weights/*.pt => <run>)
|
|
- training_params.stage_results[].results.save_dir
|
|
but only if they are under the configured models directory.
|
|
- If the weights file itself exists and is outside the models directory, delete only the file.
|
|
|
|
Returns:
|
|
(removed_paths, errors)
|
|
"""
|
|
|
|
removed: list[str] = []
|
|
errors: list[str] = []
|
|
|
|
models_root = Path(self.config_manager.get_models_directory() or "data/models").expanduser()
|
|
try:
|
|
models_root_resolved = models_root.resolve()
|
|
except Exception:
|
|
models_root_resolved = models_root
|
|
|
|
inferred_dirs: list[Path] = []
|
|
|
|
# 1) From model_path
|
|
model_path_value = model.get("model_path")
|
|
if model_path_value:
|
|
try:
|
|
p = Path(str(model_path_value)).expanduser()
|
|
p_resolved = p.resolve() if p.exists() else p
|
|
if p_resolved.is_file():
|
|
if p_resolved.parent.name == "weights" and p_resolved.parent.parent.exists():
|
|
inferred_dirs.append(p_resolved.parent.parent)
|
|
elif p_resolved.parent.exists():
|
|
inferred_dirs.append(p_resolved.parent)
|
|
except Exception:
|
|
pass
|
|
|
|
# 2) From training_params.stage_results[].results.save_dir
|
|
training_params = model.get("training_params") or {}
|
|
if isinstance(training_params, dict):
|
|
stage_results = training_params.get("stage_results")
|
|
if isinstance(stage_results, list):
|
|
for stage in stage_results:
|
|
results = (stage or {}).get("results")
|
|
save_dir = (results or {}).get("save_dir") if isinstance(results, dict) else None
|
|
if not save_dir:
|
|
continue
|
|
try:
|
|
d = Path(str(save_dir)).expanduser()
|
|
if d.exists() and d.is_dir():
|
|
inferred_dirs.append(d)
|
|
except Exception:
|
|
continue
|
|
|
|
# Deduplicate inferred_dirs
|
|
unique_dirs: list[Path] = []
|
|
seen: set[str] = set()
|
|
for d in inferred_dirs:
|
|
try:
|
|
key = str(d.resolve())
|
|
except Exception:
|
|
key = str(d)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
unique_dirs.append(d)
|
|
|
|
# Delete directories under models_root
|
|
for d in unique_dirs:
|
|
try:
|
|
d_resolved = d.resolve()
|
|
except Exception:
|
|
d_resolved = d
|
|
try:
|
|
if d_resolved.exists() and d_resolved.is_dir() and d_resolved.is_relative_to(models_root_resolved):
|
|
shutil.rmtree(d_resolved)
|
|
removed.append(str(d_resolved))
|
|
except Exception as exc:
|
|
errors.append(f"Failed to remove directory {d_resolved}: {exc}")
|
|
|
|
# If nothing matched (e.g., model_path outside models_root), delete just the file.
|
|
if model_path_value:
|
|
try:
|
|
p = Path(str(model_path_value)).expanduser()
|
|
if p.exists() and p.is_file():
|
|
p_resolved = p.resolve()
|
|
if not p_resolved.is_relative_to(models_root_resolved):
|
|
p_resolved.unlink()
|
|
removed.append(str(p_resolved))
|
|
except Exception as exc:
|
|
errors.append(f"Failed to remove model file {model_path_value}: {exc}")
|
|
|
|
return removed, errors
|
|
|
|
def _show_about(self):
|
|
"""Show about dialog."""
|
|
about_text = """
|
|
<h2>Microscopy Object Detection Application</h2>
|
|
<p><b>Version:</b> 1.0.0</p>
|
|
<p>A desktop application for detecting organelles and membrane branching
|
|
structures in microscopy images using YOLOv8.</p>
|
|
|
|
<p><b>Features:</b></p>
|
|
<ul>
|
|
<li>Object detection with YOLOv8</li>
|
|
<li>Model training and validation</li>
|
|
<li>Detection results storage</li>
|
|
<li>Interactive visualization</li>
|
|
<li>Export capabilities</li>
|
|
</ul>
|
|
|
|
<p><b>Technologies:</b></p>
|
|
<ul>
|
|
<li>Ultralytics YOLOv8</li>
|
|
<li>PySide6</li>
|
|
<li>pyqtgraph</li>
|
|
<li>SQLite</li>
|
|
</ul>
|
|
"""
|
|
|
|
QMessageBox.about(self, "About", about_text)
|
|
|
|
def _show_documentation(self):
|
|
"""Show documentation."""
|
|
QMessageBox.information(
|
|
self,
|
|
"Documentation",
|
|
"Please refer to README.md and ARCHITECTURE.md files in the project directory.",
|
|
)
|
|
|
|
def _update_status(self, message: str, timeout: int = 5000):
|
|
"""
|
|
Update status bar message.
|
|
|
|
Args:
|
|
message: Status message to display
|
|
timeout: Time in milliseconds to show message (0 for permanent)
|
|
"""
|
|
self.status_label.setText(message)
|
|
if timeout > 0:
|
|
QTimer.singleShot(timeout, lambda: self.status_label.setText("Ready"))
|
|
|
|
def closeEvent(self, event):
|
|
"""Handle window close event."""
|
|
reply = QMessageBox.question(
|
|
self,
|
|
"Confirm Exit",
|
|
"Are you sure you want to exit?",
|
|
QMessageBox.Yes | QMessageBox.No,
|
|
QMessageBox.No,
|
|
)
|
|
|
|
if reply == QMessageBox.Yes:
|
|
# Save window state before closing
|
|
self._save_window_state()
|
|
|
|
# Persist tab state and stop background work before exit
|
|
if hasattr(self, "training_tab"):
|
|
self.training_tab.shutdown()
|
|
if hasattr(self, "annotation_tab"):
|
|
self.annotation_tab.save_state()
|
|
|
|
logger.info("Application closing")
|
|
event.accept()
|
|
else:
|
|
event.ignore()
|