34 Commits

Author SHA1 Message Date
506c74e53a Small update 2026-01-16 10:39:46 +02:00
eefda5b878 Adding metdata to tiled images 2026-01-16 10:39:14 +02:00
31cb6a6c8e Using 8bit images 2026-01-16 10:38:34 +02:00
0c19ea2557 Updating 2026-01-16 10:30:13 +02:00
89e47591db Formatting 2026-01-16 10:27:15 +02:00
69cde09e53 Changing alpha value 2026-01-16 10:26:25 +02:00
fcbd5fb16d correcting label writing and formatting code 2026-01-16 10:24:19 +02:00
ca52312925 Adding LIKE option for filtering queries 2026-01-16 10:18:48 +02:00
0a93bf797a Adding auto zoom when result is loaded 2026-01-12 14:15:02 +02:00
d998c65665 Updating image splitter 2026-01-12 13:28:00 +02:00
510eabfa94 Adding splitter method 2026-01-05 13:56:57 +02:00
395d263900 Update 2026-01-05 08:59:36 +02:00
e98d287b8a Updating tiff image patch 2026-01-02 12:44:06 +02:00
d25101de2d adding files 2026-01-02 12:40:44 +02:00
f88beef188 Another test 2025-12-19 13:50:49 +02:00
2fd9a2acf4 RGB 2025-12-19 13:31:24 +02:00
2bcd18cc75 Bug fix 2025-12-19 13:13:12 +02:00
5d25378c46 Testing with uint conversion 2025-12-19 13:10:36 +02:00
2b0b48921e Testing more grayscale 2025-12-19 12:02:11 +02:00
b0c05f0225 testing grayscale 2025-12-19 11:55:38 +02:00
97badaa390 Samll update 2025-12-19 11:31:12 +02:00
8f8132ce61 Testing detect 2025-12-19 10:44:11 +02:00
6ae7481e25 Adding debug messages 2025-12-19 10:15:53 +02:00
061f8b3ca2 Fixing pseudo rgb 2025-12-19 09:56:43 +02:00
a8e5db3135 Small change 2025-12-18 13:03:12 +02:00
268ed5175e Appling pseudo channels for RGB 2025-12-18 12:52:13 +02:00
5e9d3b1dc4 Adding logger 2025-12-18 12:04:41 +02:00
7d83e9b9b1 Adding important file 2025-12-17 00:45:56 +02:00
e364d06217 Implementing uint16 reading with tifffile 2025-12-16 23:02:45 +02:00
e5036c10cf Small fix 2025-12-16 18:03:56 +02:00
c7e388d9ae Updating progressbar 2025-12-16 17:20:25 +02:00
6b995e7325 upate 2025-12-16 13:24:20 +02:00
0e0741d323 Update on convert_grayscale_to_rgb_preserve_range, making it class method 2025-12-16 12:37:34 +02:00
dd99a0677c Updating image converter and aading simple script to visulaize segmentation 2025-12-16 11:27:38 +02:00
14 changed files with 1151 additions and 494 deletions

View File

@@ -1,57 +0,0 @@
database:
path: data/detections.db
image_repository:
base_path: ''
allowed_extensions:
- .jpg
- .jpeg
- .png
- .tif
- .tiff
- .bmp
models:
default_base_model: yolov8s-seg.pt
models_directory: data/models
base_model_choices:
- yolov8s-seg.pt
- yolo11s-seg.pt
training:
default_epochs: 100
default_batch_size: 16
default_imgsz: 1024
default_patience: 50
default_lr0: 0.01
two_stage:
enabled: false
stage1:
epochs: 20
lr0: 0.0005
patience: 10
freeze: 10
stage2:
epochs: 150
lr0: 0.0003
patience: 30
last_dataset_yaml: /home/martin/code/object_detection/data/datasets/data.yaml
last_dataset_dir: /home/martin/code/object_detection/data/datasets
detection:
default_confidence: 0.25
default_iou: 0.45
max_batch_size: 100
visualization:
bbox_colors:
organelle: '#FF6B6B'
membrane_branch: '#4ECDC4'
default: '#00FF00'
bbox_thickness: 2
font_size: 12
export:
formats:
- csv
- json
- excel
default_format: csv
logging:
level: INFO
file: logs/app.log
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

View File

@@ -82,12 +82,12 @@ include-package-data = true
"src.database" = ["*.sql"]
[tool.black]
line-length = 88
line-length = 120
target-version = ['py38', 'py39', 'py310', 'py311']
include = '\.pyi?$'
[tool.pylint.messages_control]
max-line-length = 88
max-line-length = 120
[tool.mypy]
python_version = "3.8"

View File

@@ -60,9 +60,7 @@ class DatabaseManager:
cursor = conn.cursor()
# Check if annotations table exists
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='annotations'"
)
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='annotations'")
if not cursor.fetchone():
# Table doesn't exist yet, no migration needed
return
@@ -242,9 +240,7 @@ class DatabaseManager:
return cursor.lastrowid
except sqlite3.IntegrityError:
# Image already exists, return its ID
cursor.execute(
"SELECT id FROM images WHERE relative_path = ?", (relative_path,)
)
cursor.execute("SELECT id FROM images WHERE relative_path = ?", (relative_path,))
row = cursor.fetchone()
return row["id"] if row else None
finally:
@@ -255,17 +251,13 @@ class DatabaseManager:
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM images WHERE relative_path = ?", (relative_path,)
)
cursor.execute("SELECT * FROM images WHERE relative_path = ?", (relative_path,))
row = cursor.fetchone()
return dict(row) if row else None
finally:
conn.close()
def get_or_create_image(
self, relative_path: str, filename: str, width: int, height: int
) -> int:
def get_or_create_image(self, relative_path: str, filename: str, width: int, height: int) -> int:
"""Get existing image or create new one."""
existing = self.get_image_by_path(relative_path)
if existing:
@@ -355,16 +347,8 @@ class DatabaseManager:
bbox[2],
bbox[3],
det["confidence"],
(
json.dumps(det.get("segmentation_mask"))
if det.get("segmentation_mask")
else None
),
(
json.dumps(det.get("metadata"))
if det.get("metadata")
else None
),
(json.dumps(det.get("segmentation_mask")) if det.get("segmentation_mask") else None),
(json.dumps(det.get("metadata")) if det.get("metadata") else None),
),
)
conn.commit()
@@ -409,15 +393,16 @@ class DatabaseManager:
if filters:
conditions = []
for key, value in filters.items():
if (
key.startswith("d.")
or key.startswith("i.")
or key.startswith("m.")
):
conditions.append(f"{key} = ?")
if key.startswith("d.") or key.startswith("i.") or key.startswith("m."):
if "like" in value.lower():
conditions.append(f"{key} LIKE ?")
params.append(value.split(" ")[1])
else:
conditions.append(f"{key} = ?")
params.append(value)
else:
conditions.append(f"d.{key} = ?")
params.append(value)
params.append(value)
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY d.detected_at DESC"
@@ -442,18 +427,14 @@ class DatabaseManager:
finally:
conn.close()
def get_detections_for_image(
self, image_id: int, model_id: Optional[int] = None
) -> List[Dict]:
def get_detections_for_image(self, image_id: int, model_id: Optional[int] = None) -> List[Dict]:
"""Get all detections for a specific image."""
filters = {"image_id": image_id}
if model_id:
filters["model_id"] = model_id
return self.get_detections(filters)
def delete_detections_for_image(
self, image_id: int, model_id: Optional[int] = None
) -> int:
def delete_detections_for_image(self, image_id: int, model_id: Optional[int] = None) -> int:
"""Delete detections tied to a specific image and optional model."""
conn = self.get_connection()
try:
@@ -524,9 +505,7 @@ class DatabaseManager:
""",
params,
)
class_counts = {
row["class_name"]: row["count"] for row in cursor.fetchall()
}
class_counts = {row["class_name"]: row["count"] for row in cursor.fetchall()}
# Average confidence
cursor.execute(
@@ -583,9 +562,7 @@ class DatabaseManager:
# ==================== Export Operations ====================
def export_detections_to_csv(
self, output_path: str, filters: Optional[Dict] = None
) -> bool:
def export_detections_to_csv(self, output_path: str, filters: Optional[Dict] = None) -> bool:
"""Export detections to CSV file."""
try:
detections = self.get_detections(filters)
@@ -614,9 +591,7 @@ class DatabaseManager:
for det in detections:
row = {k: det[k] for k in fieldnames if k in det}
# Convert segmentation mask list to JSON string for CSV
if row.get("segmentation_mask") and isinstance(
row["segmentation_mask"], list
):
if row.get("segmentation_mask") and isinstance(row["segmentation_mask"], list):
row["segmentation_mask"] = json.dumps(row["segmentation_mask"])
writer.writerow(row)
@@ -625,9 +600,7 @@ class DatabaseManager:
print(f"Error exporting to CSV: {e}")
return False
def export_detections_to_json(
self, output_path: str, filters: Optional[Dict] = None
) -> bool:
def export_detections_to_json(self, output_path: str, filters: Optional[Dict] = None) -> bool:
"""Export detections to JSON file."""
try:
detections = self.get_detections(filters)
@@ -785,17 +758,13 @@ class DatabaseManager:
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM object_classes WHERE class_name = ?", (class_name,)
)
cursor.execute("SELECT * FROM object_classes WHERE class_name = ?", (class_name,))
row = cursor.fetchone()
return dict(row) if row else None
finally:
conn.close()
def add_object_class(
self, class_name: str, color: str, description: Optional[str] = None
) -> int:
def add_object_class(self, class_name: str, color: str, description: Optional[str] = None) -> int:
"""
Add a new object class.
@@ -928,8 +897,7 @@ class DatabaseManager:
if not split_map[required]:
raise ValueError(
"Unable to determine %s image directory under %s. Provide it "
"explicitly via the 'splits' argument."
% (required, dataset_root_path)
"explicitly via the 'splits' argument." % (required, dataset_root_path)
)
yaml_splits: Dict[str, str] = {}
@@ -955,11 +923,7 @@ class DatabaseManager:
if yaml_splits.get("test"):
payload["test"] = yaml_splits["test"]
output_path_obj = (
Path(output_path).expanduser()
if output_path
else dataset_root_path / "data.yaml"
)
output_path_obj = Path(output_path).expanduser() if output_path else dataset_root_path / "data.yaml"
output_path_obj.parent.mkdir(parents=True, exist_ok=True)
with open(output_path_obj, "w", encoding="utf-8") as handle:
@@ -1019,15 +983,9 @@ class DatabaseManager:
for split_name, options in patterns.items():
for relative in options:
candidate = (dataset_root / relative).resolve()
if (
candidate.exists()
and candidate.is_dir()
and self._directory_has_images(candidate)
):
if candidate.exists() and candidate.is_dir() and self._directory_has_images(candidate):
try:
inferred[split_name] = candidate.relative_to(
dataset_root
).as_posix()
inferred[split_name] = candidate.relative_to(dataset_root).as_posix()
except ValueError:
inferred[split_name] = candidate.as_posix()
break

View File

@@ -35,9 +35,7 @@ logger = get_logger(__name__)
class ResultsTab(QWidget):
"""Results tab showing detection history and preview overlays."""
def __init__(
self, db_manager: DatabaseManager, config_manager: ConfigManager, parent=None
):
def __init__(self, db_manager: DatabaseManager, config_manager: ConfigManager, parent=None):
super().__init__(parent)
self.db_manager = db_manager
self.config_manager = config_manager
@@ -71,24 +69,12 @@ class ResultsTab(QWidget):
left_layout.addLayout(controls_layout)
self.results_table = QTableWidget(0, 5)
self.results_table.setHorizontalHeaderLabels(
["Image", "Model", "Detections", "Classes", "Last Updated"]
)
self.results_table.horizontalHeader().setSectionResizeMode(
0, QHeaderView.Stretch
)
self.results_table.horizontalHeader().setSectionResizeMode(
1, QHeaderView.Stretch
)
self.results_table.horizontalHeader().setSectionResizeMode(
2, QHeaderView.ResizeToContents
)
self.results_table.horizontalHeader().setSectionResizeMode(
3, QHeaderView.Stretch
)
self.results_table.horizontalHeader().setSectionResizeMode(
4, QHeaderView.ResizeToContents
)
self.results_table.setHorizontalHeaderLabels(["Image", "Model", "Detections", "Classes", "Last Updated"])
self.results_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch)
self.results_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
self.results_table.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeToContents)
self.results_table.horizontalHeader().setSectionResizeMode(3, QHeaderView.Stretch)
self.results_table.horizontalHeader().setSectionResizeMode(4, QHeaderView.ResizeToContents)
self.results_table.setSelectionBehavior(QAbstractItemView.SelectRows)
self.results_table.setSelectionMode(QAbstractItemView.SingleSelection)
self.results_table.setEditTriggers(QAbstractItemView.NoEditTriggers)
@@ -106,6 +92,8 @@ class ResultsTab(QWidget):
preview_layout = QVBoxLayout()
self.preview_canvas = AnnotationCanvasWidget()
# Auto-zoom so newly loaded images fill the available preview viewport.
self.preview_canvas.set_auto_fit_to_view(True)
self.preview_canvas.set_polyline_enabled(False)
self.preview_canvas.set_show_bboxes(True)
preview_layout.addWidget(self.preview_canvas)
@@ -119,9 +107,7 @@ class ResultsTab(QWidget):
self.show_bboxes_checkbox.stateChanged.connect(self._toggle_bboxes)
self.show_confidence_checkbox = QCheckBox("Show Confidence")
self.show_confidence_checkbox.setChecked(False)
self.show_confidence_checkbox.stateChanged.connect(
self._apply_detection_overlays
)
self.show_confidence_checkbox.stateChanged.connect(self._apply_detection_overlays)
toggles_layout.addWidget(self.show_masks_checkbox)
toggles_layout.addWidget(self.show_bboxes_checkbox)
toggles_layout.addWidget(self.show_confidence_checkbox)
@@ -169,8 +155,7 @@ class ResultsTab(QWidget):
"image_id": det["image_id"],
"model_id": det["model_id"],
"image_path": det.get("image_path"),
"image_filename": det.get("image_filename")
or det.get("image_path"),
"image_filename": det.get("image_filename") or det.get("image_path"),
"model_name": det.get("model_name", ""),
"model_version": det.get("model_version", ""),
"last_detected": det.get("detected_at"),
@@ -183,8 +168,7 @@ class ResultsTab(QWidget):
entry["count"] += 1
if det.get("detected_at") and (
not entry.get("last_detected")
or str(det.get("detected_at")) > str(entry.get("last_detected"))
not entry.get("last_detected") or str(det.get("detected_at")) > str(entry.get("last_detected"))
):
entry["last_detected"] = det.get("detected_at")
if det.get("class_name"):
@@ -214,9 +198,7 @@ class ResultsTab(QWidget):
for row, entry in enumerate(self.detection_summary):
model_label = f"{entry['model_name']} {entry['model_version']}".strip()
class_list = (
", ".join(sorted(entry["classes"])) if entry["classes"] else "-"
)
class_list = ", ".join(sorted(entry["classes"])) if entry["classes"] else "-"
items = [
QTableWidgetItem(entry.get("image_filename", "")),

View File

@@ -10,6 +10,7 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
import numpy as np
from PySide6.QtCore import Qt, QThread, Signal
from PySide6.QtWidgets import (
QWidget,
@@ -34,7 +35,7 @@ from PySide6.QtWidgets import (
from src.database.db_manager import DatabaseManager
from src.model.yolo_wrapper import YOLOWrapper
from src.utils.config_manager import ConfigManager
from src.utils.image import Image, convert_grayscale_to_rgb_preserve_range
from src.utils.image import Image
from src.utils.logger import get_logger
@@ -91,10 +92,7 @@ class TrainingWorker(QThread):
},
}
]
computed_total = sum(
max(0, int((stage.get("params") or {}).get("epochs", 0)))
for stage in self.stage_plan
)
computed_total = sum(max(0, int((stage.get("params") or {}).get("epochs", 0))) for stage in self.stage_plan)
self.total_epochs = total_epochs if total_epochs else computed_total or epochs
self._stop_requested = False
@@ -201,9 +199,7 @@ class TrainingWorker(QThread):
class TrainingTab(QWidget):
"""Training tab for model training."""
def __init__(
self, db_manager: DatabaseManager, config_manager: ConfigManager, parent=None
):
def __init__(self, db_manager: DatabaseManager, config_manager: ConfigManager, parent=None):
super().__init__(parent)
self.db_manager = db_manager
self.config_manager = config_manager
@@ -337,18 +333,14 @@ class TrainingTab(QWidget):
self.model_version_edit = QLineEdit("v1")
form_layout.addRow("Version:", self.model_version_edit)
default_base_model = self.config_manager.get(
"models.default_base_model", "yolov8s-seg.pt"
)
default_base_model = self.config_manager.get("models.default_base_model", "yolov8s-seg.pt")
base_model_choices = self.config_manager.get("models.base_model_choices", [])
self.base_model_combo = QComboBox()
self.base_model_combo.addItem("Custom path…", "")
for choice in base_model_choices:
self.base_model_combo.addItem(choice, choice)
self.base_model_combo.currentIndexChanged.connect(
self._on_base_model_preset_changed
)
self.base_model_combo.currentIndexChanged.connect(self._on_base_model_preset_changed)
form_layout.addRow("Base Model Preset:", self.base_model_combo)
base_model_layout = QHBoxLayout()
@@ -434,12 +426,8 @@ class TrainingTab(QWidget):
group_layout = QVBoxLayout()
self.two_stage_checkbox = QCheckBox("Enable staged head-only + full fine-tune")
two_stage_defaults = (
training_defaults.get("two_stage", {}) if training_defaults else {}
)
self.two_stage_checkbox.setChecked(
bool(two_stage_defaults.get("enabled", False))
)
two_stage_defaults = training_defaults.get("two_stage", {}) if training_defaults else {}
self.two_stage_checkbox.setChecked(bool(two_stage_defaults.get("enabled", False)))
self.two_stage_checkbox.toggled.connect(self._on_two_stage_toggled)
group_layout.addWidget(self.two_stage_checkbox)
@@ -501,9 +489,7 @@ class TrainingTab(QWidget):
stage2_group.setLayout(stage2_form)
controls_layout.addWidget(stage2_group)
helper_label = QLabel(
"When enabled, staged hyperparameters override the global epochs/patience/lr."
)
helper_label = QLabel("When enabled, staged hyperparameters override the global epochs/patience/lr.")
helper_label.setWordWrap(True)
controls_layout.addWidget(helper_label)
@@ -548,9 +534,7 @@ class TrainingTab(QWidget):
if normalized == preset_value:
target_index = idx
break
if normalized.endswith(f"/{preset_value}") or normalized.endswith(
f"\\{preset_value}"
):
if normalized.endswith(f"/{preset_value}") or normalized.endswith(f"\\{preset_value}"):
target_index = idx
break
self.base_model_combo.blockSignals(True)
@@ -638,9 +622,7 @@ class TrainingTab(QWidget):
def _browse_dataset(self):
"""Open a file dialog to manually select data.yaml."""
start_dir = self.config_manager.get(
"training.last_dataset_dir", "data/datasets"
)
start_dir = self.config_manager.get("training.last_dataset_dir", "data/datasets")
start_path = Path(start_dir).expanduser()
if not start_path.exists():
start_path = Path.cwd()
@@ -676,9 +658,7 @@ class TrainingTab(QWidget):
return
except Exception as exc:
logger.exception("Unexpected error while generating data.yaml")
self._display_dataset_error(
"Unexpected error while generating data.yaml. Check logs for details."
)
self._display_dataset_error("Unexpected error while generating data.yaml. Check logs for details.")
QMessageBox.critical(
self,
"data.yaml Generation Failed",
@@ -755,13 +735,9 @@ class TrainingTab(QWidget):
self.selected_dataset = info
self.dataset_root_label.setText(info["root"]) # type: ignore[arg-type]
self.train_count_label.setText(
self._format_split_info(info["splits"].get("train"))
)
self.train_count_label.setText(self._format_split_info(info["splits"].get("train")))
self.val_count_label.setText(self._format_split_info(info["splits"].get("val")))
self.test_count_label.setText(
self._format_split_info(info["splits"].get("test"))
)
self.test_count_label.setText(self._format_split_info(info["splits"].get("test")))
self.num_classes_label.setText(str(info["num_classes"]))
class_names = ", ".join(info["class_names"]) or ""
self.class_names_label.setText(class_names)
@@ -815,18 +791,12 @@ class TrainingTab(QWidget):
if split_path.exists():
split_info["count"] = self._count_images(split_path)
if split_info["count"] == 0:
warnings.append(
f"No images found for {split_name} split at {split_path}"
)
warnings.append(f"No images found for {split_name} split at {split_path}")
else:
warnings.append(
f"{split_name.capitalize()} path does not exist: {split_path}"
)
warnings.append(f"{split_name.capitalize()} path does not exist: {split_path}")
else:
if split_name in ("train", "val"):
warnings.append(
f"{split_name.capitalize()} split missing in data.yaml"
)
warnings.append(f"{split_name.capitalize()} split missing in data.yaml")
splits[split_name] = split_info
names_list = self._normalize_class_names(data.get("names"))
@@ -844,9 +814,7 @@ class TrainingTab(QWidget):
if not names_list and nc_value:
names_list = [f"class_{idx}" for idx in range(int(nc_value))]
elif nc_value and len(names_list) not in (0, int(nc_value)):
warnings.append(
f"Number of class names ({len(names_list)}) does not match nc={nc_value}"
)
warnings.append(f"Number of class names ({len(names_list)}) does not match nc={nc_value}")
dataset_name = data.get("name") or base_path.name
@@ -898,16 +866,12 @@ class TrainingTab(QWidget):
class_index_map = self._build_class_index_map(dataset_info)
if not class_index_map:
self._append_training_log(
"Skipping label export: dataset classes do not match database entries."
)
self._append_training_log("Skipping label export: dataset classes do not match database entries.")
return
dataset_root_str = dataset_info.get("root")
dataset_yaml_path = dataset_info.get("yaml_path")
dataset_yaml = (
Path(dataset_yaml_path).expanduser() if dataset_yaml_path else None
)
dataset_yaml = Path(dataset_yaml_path).expanduser() if dataset_yaml_path else None
dataset_root: Optional[Path]
if dataset_root_str:
dataset_root = Path(dataset_root_str).resolve()
@@ -941,7 +905,9 @@ class TrainingTab(QWidget):
if stats["registered_images"]:
message += f" {stats['registered_images']} image(s) had database-backed annotations."
if stats["missing_records"]:
message += f" {stats['missing_records']} image(s) had no database entry; empty label files were written."
message += (
f" {stats['missing_records']} image(s) had no database entry; empty label files were written."
)
split_messages.append(message)
for msg in split_messages:
@@ -973,9 +939,7 @@ class TrainingTab(QWidget):
continue
processed_images += 1
label_path = (labels_dir / image_file.relative_to(images_dir)).with_suffix(
".txt"
)
label_path = (labels_dir / image_file.relative_to(images_dir)).with_suffix(".txt")
label_path.parent.mkdir(parents=True, exist_ok=True)
found, annotation_entries = self._fetch_annotations_for_image(
@@ -991,25 +955,23 @@ class TrainingTab(QWidget):
for entry in annotation_entries:
polygon = entry.get("polygon") or []
if polygon:
print(image_file, polygon[:4], polygon[-2:], entry.get("bbox"))
# coords = " ".join(f"{value:.6f}" for value in entry.get("bbox"))
# coords += " "
coords = " ".join(f"{value:.6f}" for value in polygon)
handle.write(f"{entry['class_idx']} {coords}\n")
annotations_written += 1
elif entry.get("bbox"):
x_center, y_center, width, height = entry["bbox"]
handle.write(
f"{entry['class_idx']} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}\n"
)
handle.write(f"{entry['class_idx']} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}\n")
annotations_written += 1
total_annotations += annotations_written
cache_reset_root = labels_dir.parent
self._invalidate_split_cache(cache_reset_root)
if processed_images == 0:
self._append_training_log(
f"[{split_name}] No images found to export labels for."
)
self._append_training_log(f"[{split_name}] No images found to export labels for.")
return None
return {
@@ -1135,6 +1097,10 @@ class TrainingTab(QWidget):
xs.append(x_val)
ys.append(y_val)
if any(np.abs(np.array(coords[:2]) - np.array(coords[-2:])) < 1e-5):
print("Closing polygon")
coords.extend(coords[:2])
if len(coords) < 6:
continue
@@ -1147,6 +1113,11 @@ class TrainingTab(QWidget):
+ abs((min(ys) if ys else 0.0) - y_min)
+ abs((max(ys) if ys else 0.0) - y_max)
)
width = max(0.0, x_max - x_min)
height = max(0.0, y_max - y_min)
x_center = x_min + width / 2.0
y_center = y_min + height / 2.0
score = (x_center, y_center, width, height)
candidates.append((score, coords))
@@ -1164,13 +1135,10 @@ class TrainingTab(QWidget):
return 1.0
return value
def _prepare_dataset_for_training(
self, dataset_yaml: Path, dataset_info: Optional[Dict[str, Any]] = None
) -> Path:
def _prepare_dataset_for_training(self, dataset_yaml: Path, dataset_info: Optional[Dict[str, Any]] = None) -> Path:
dataset_info = dataset_info or (
self.selected_dataset
if self.selected_dataset
and self.selected_dataset.get("yaml_path") == str(dataset_yaml)
if self.selected_dataset and self.selected_dataset.get("yaml_path") == str(dataset_yaml)
else self._parse_dataset_yaml(dataset_yaml)
)
@@ -1189,14 +1157,10 @@ class TrainingTab(QWidget):
cache_root = self._get_rgb_cache_root(dataset_yaml)
rgb_yaml = cache_root / "data.yaml"
if rgb_yaml.exists():
self._append_training_log(
f"Detected grayscale dataset; reusing RGB cache at {cache_root}"
)
self._append_training_log(f"Detected grayscale dataset; reusing RGB cache at {cache_root}")
return rgb_yaml
self._append_training_log(
f"Detected grayscale dataset; creating RGB cache at {cache_root}"
)
self._append_training_log(f"Detected grayscale dataset; creating RGB cache at {cache_root}")
self._build_rgb_dataset(cache_root, dataset_info)
return rgb_yaml
@@ -1303,6 +1267,14 @@ class TrainingTab(QWidget):
sample_image = self._find_first_image(images_dir)
if not sample_image:
return False
# Do not force an RGB cache for TIFF datasets.
# We handle grayscale/16-bit TIFFs via runtime Ultralytics patches that:
# - load TIFFs with `tifffile`
# - replicate grayscale to 3 channels without quantization
# - normalize uint16 correctly during training
if sample_image.suffix.lower() in {".tif", ".tiff"}:
return False
try:
img = Image(sample_image)
return img.pil_image.mode.upper() != "RGB"
@@ -1368,7 +1340,7 @@ class TrainingTab(QWidget):
img_obj = Image(src)
pil_img = img_obj.pil_image
if len(pil_img.getbands()) == 1:
rgb_img = convert_grayscale_to_rgb_preserve_range(pil_img)
rgb_img = img_obj.convert_grayscale_to_rgb_preserve_range()
else:
rgb_img = pil_img.convert("RGB")
rgb_img.save(dst)
@@ -1455,15 +1427,12 @@ class TrainingTab(QWidget):
dataset_path = Path(dataset_yaml).expanduser()
if not dataset_path.exists():
QMessageBox.warning(
self, "Invalid Dataset", "Selected data.yaml file does not exist."
)
QMessageBox.warning(self, "Invalid Dataset", "Selected data.yaml file does not exist.")
return
dataset_info = (
self.selected_dataset
if self.selected_dataset
and self.selected_dataset.get("yaml_path") == str(dataset_path)
if self.selected_dataset and self.selected_dataset.get("yaml_path") == str(dataset_path)
else self._parse_dataset_yaml(dataset_path)
)
@@ -1472,16 +1441,12 @@ class TrainingTab(QWidget):
dataset_to_use = self._prepare_dataset_for_training(dataset_path, dataset_info)
if dataset_to_use != dataset_path:
self._append_training_log(
f"Using RGB-converted dataset at {dataset_to_use.parent}"
)
self._append_training_log(f"Using RGB-converted dataset at {dataset_to_use.parent}")
params = self._collect_training_params()
stage_plan = self._compose_stage_plan(params)
params["stage_plan"] = stage_plan
total_planned_epochs = (
self._calculate_total_stage_epochs(stage_plan) or params["epochs"]
)
total_planned_epochs = self._calculate_total_stage_epochs(stage_plan) or params["epochs"]
params["total_planned_epochs"] = total_planned_epochs
self._active_training_params = params
self._training_cancelled = False
@@ -1490,9 +1455,7 @@ class TrainingTab(QWidget):
self._append_training_log("Two-stage fine-tuning schedule:")
self._log_stage_plan(stage_plan)
self._append_training_log(
f"Starting training run '{params['run_name']}' using {params['base_model']}"
)
self._append_training_log(f"Starting training run '{params['run_name']}' using {params['base_model']}")
self.training_progress_bar.setVisible(True)
self.training_progress_bar.setMaximum(max(1, total_planned_epochs))
@@ -1520,9 +1483,7 @@ class TrainingTab(QWidget):
def _stop_training(self):
if self.training_worker and self.training_worker.isRunning():
self._training_cancelled = True
self._append_training_log(
"Stop requested. Waiting for the current epoch to finish..."
)
self._append_training_log("Stop requested. Waiting for the current epoch to finish...")
self.training_worker.stop()
self.stop_training_button.setEnabled(False)
@@ -1558,9 +1519,7 @@ class TrainingTab(QWidget):
if worker.isRunning():
if not worker.wait(wait_timeout_ms):
logger.warning(
"Training worker did not finish within %sms", wait_timeout_ms
)
logger.warning("Training worker did not finish within %sms", wait_timeout_ms)
worker.deleteLater()
@@ -1577,16 +1536,12 @@ class TrainingTab(QWidget):
self._set_training_state(False)
self.training_progress_bar.setVisible(False)
def _on_training_progress(
self, current_epoch: int, total_epochs: int, metrics: Dict[str, Any]
):
def _on_training_progress(self, current_epoch: int, total_epochs: int, metrics: Dict[str, Any]):
self.training_progress_bar.setMaximum(total_epochs)
self.training_progress_bar.setValue(current_epoch)
parts = [f"Epoch {current_epoch}/{total_epochs}"]
if metrics:
metric_text = ", ".join(
f"{key}: {value:.4f}" for key, value in metrics.items()
)
metric_text = ", ".join(f"{key}: {value:.4f}" for key, value in metrics.items())
parts.append(metric_text)
self._append_training_log(" | ".join(parts))
@@ -1613,9 +1568,7 @@ class TrainingTab(QWidget):
f"Model trained but not registered: {exc}",
)
else:
QMessageBox.information(
self, "Training Complete", "Training finished successfully."
)
QMessageBox.information(self, "Training Complete", "Training finished successfully.")
def _on_training_error(self, message: str):
self._cleanup_training_worker()
@@ -1661,9 +1614,7 @@ class TrainingTab(QWidget):
metrics=results.get("metrics"),
)
self._append_training_log(
f"Registered model '{params['model_name']}' (ID {model_id}) at {model_path}"
)
self._append_training_log(f"Registered model '{params['model_name']}' (ID {model_id}) at {model_path}")
self._active_training_params = None
def _set_training_state(self, is_training: bool):
@@ -1706,9 +1657,7 @@ class TrainingTab(QWidget):
def _browse_save_dir(self):
start_path = self.save_dir_edit.text().strip() or "data/models"
directory = QFileDialog.getExistingDirectory(
self, "Select Save Directory", start_path
)
directory = QFileDialog.getExistingDirectory(self, "Select Save Directory", start_path)
if directory:
self.save_dir_edit.setText(directory)

View File

@@ -18,7 +18,7 @@ from PySide6.QtGui import (
QPaintEvent,
QPolygonF,
)
from PySide6.QtCore import Qt, QEvent, Signal, QPoint, QPointF, QRect
from PySide6.QtCore import Qt, QEvent, Signal, QPoint, QPointF, QRect, QTimer
from typing import Any, Dict, List, Optional, Tuple
from src.utils.image import Image, ImageLoadError
@@ -79,9 +79,7 @@ def rdp(points: List[Tuple[float, float]], epsilon: float) -> List[Tuple[float,
return [start, end]
def simplify_polyline(
points: List[Tuple[float, float]], epsilon: float
) -> List[Tuple[float, float]]:
def simplify_polyline(points: List[Tuple[float, float]], epsilon: float) -> List[Tuple[float, float]]:
"""
Simplify a polyline with RDP while preserving closure semantics.
@@ -145,6 +143,10 @@ class AnnotationCanvasWidget(QWidget):
self.zoom_step = 0.1
self.zoom_wheel_step = 0.15
# Auto-fit behavior (opt-in): when enabled, newly loaded images (and resizes)
# will scale to fill the available viewport while preserving aspect ratio.
self._auto_fit_to_view: bool = False
# Drawing / interaction state
self.is_drawing = False
self.polyline_enabled = False
@@ -175,6 +177,35 @@ class AnnotationCanvasWidget(QWidget):
self._setup_ui()
def set_auto_fit_to_view(self, enabled: bool):
"""Enable/disable automatic zoom-to-fit behavior."""
self._auto_fit_to_view = bool(enabled)
if self._auto_fit_to_view and self.original_pixmap is not None:
QTimer.singleShot(0, self.fit_to_view)
def fit_to_view(self, padding_px: int = 6):
"""Zoom the image so it fits the scroll area's viewport (aspect preserved)."""
if self.original_pixmap is None:
return
viewport = self.scroll_area.viewport().size()
available_w = max(1, int(viewport.width()) - int(padding_px))
available_h = max(1, int(viewport.height()) - int(padding_px))
img_w = max(1, int(self.original_pixmap.width()))
img_h = max(1, int(self.original_pixmap.height()))
scale_w = available_w / img_w
scale_h = available_h / img_h
new_scale = min(scale_w, scale_h)
new_scale = max(self.zoom_min, min(self.zoom_max, float(new_scale)))
if abs(new_scale - self.zoom_scale) < 1e-4:
return
self.zoom_scale = new_scale
self._apply_zoom()
def _setup_ui(self):
"""Setup user interface."""
layout = QVBoxLayout()
@@ -187,9 +218,7 @@ class AnnotationCanvasWidget(QWidget):
self.canvas_label = QLabel("No image loaded")
self.canvas_label.setAlignment(Qt.AlignCenter)
self.canvas_label.setStyleSheet(
"QLabel { background-color: #2b2b2b; color: #888; }"
)
self.canvas_label.setStyleSheet("QLabel { background-color: #2b2b2b; color: #888; }")
self.canvas_label.setScaledContents(False)
self.canvas_label.setMouseTracking(True)
@@ -212,9 +241,18 @@ class AnnotationCanvasWidget(QWidget):
self.zoom_scale = 1.0
self.clear_annotations()
self._display_image()
logger.debug(
f"Loaded image into annotation canvas: {image.width}x{image.height}"
)
# Defer fit-to-view until the widget has a valid viewport size.
if self._auto_fit_to_view:
QTimer.singleShot(0, self.fit_to_view)
logger.debug(f"Loaded image into annotation canvas: {image.width}x{image.height}")
def resizeEvent(self, event):
"""Optionally keep the image fitted when the widget is resized."""
super().resizeEvent(event)
if self._auto_fit_to_view and self.original_pixmap is not None:
QTimer.singleShot(0, self.fit_to_view)
def clear(self):
"""Clear the displayed image and all annotations."""
@@ -250,12 +288,10 @@ class AnnotationCanvasWidget(QWidget):
# Get image data in a format compatible with Qt
if self.current_image.channels in (3, 4):
image_data = self.current_image.get_rgb()
height, width = image_data.shape[:2]
else:
image_data = self.current_image.get_grayscale()
height, width = image_data.shape
image_data = self.current_image.get_qt_rgb()
image_data = np.ascontiguousarray(image_data)
height, width = image_data.shape[:2]
bytes_per_line = image_data.strides[0]
qimage = QImage(
@@ -263,7 +299,7 @@ class AnnotationCanvasWidget(QWidget):
width,
height,
bytes_per_line,
self.current_image.qtimage_format,
QImage.Format_RGBX32FPx4, # self.current_image.qtimage_format,
).copy() # Copy so Qt owns the buffer even after numpy array goes out of scope
self.original_pixmap = QPixmap.fromImage(qimage)
@@ -291,22 +327,14 @@ class AnnotationCanvasWidget(QWidget):
scaled_width,
scaled_height,
Qt.KeepAspectRatio,
(
Qt.SmoothTransformation
if self.zoom_scale >= 1.0
else Qt.FastTransformation
),
(Qt.SmoothTransformation if self.zoom_scale >= 1.0 else Qt.FastTransformation),
)
scaled_annotations = self.annotation_pixmap.scaled(
scaled_width,
scaled_height,
Qt.KeepAspectRatio,
(
Qt.SmoothTransformation
if self.zoom_scale >= 1.0
else Qt.FastTransformation
),
(Qt.SmoothTransformation if self.zoom_scale >= 1.0 else Qt.FastTransformation),
)
# Composite image and annotations
@@ -392,16 +420,11 @@ class AnnotationCanvasWidget(QWidget):
y = (pos.y() - offset_y) / self.zoom_scale
# Check bounds
if (
0 <= x < self.original_pixmap.width()
and 0 <= y < self.original_pixmap.height()
):
if 0 <= x < self.original_pixmap.width() and 0 <= y < self.original_pixmap.height():
return (int(x), int(y))
return None
def _find_polyline_at(
self, img_x: float, img_y: float, threshold_px: float = 5.0
) -> Optional[int]:
def _find_polyline_at(self, img_x: float, img_y: float, threshold_px: float = 5.0) -> Optional[int]:
"""
Find index of polyline whose geometry is within threshold_px of (img_x, img_y).
Returns the index in self.polylines, or None if none is close enough.
@@ -423,9 +446,7 @@ class AnnotationCanvasWidget(QWidget):
# Precise distance to all segments
for (x1, y1), (x2, y2) in zip(polyline[:-1], polyline[1:]):
d = perpendicular_distance(
(img_x, img_y), (float(x1), float(y1)), (float(x2), float(y2))
)
d = perpendicular_distance((img_x, img_y), (float(x1), float(y1)), (float(x2), float(y2)))
if d < best_dist:
best_dist = d
best_index = idx
@@ -626,11 +647,7 @@ class AnnotationCanvasWidget(QWidget):
def mouseMoveEvent(self, event: QMouseEvent):
"""Handle mouse move events for drawing."""
if (
not self.is_drawing
or not self.polyline_enabled
or self.annotation_pixmap is None
):
if not self.is_drawing or not self.polyline_enabled or self.annotation_pixmap is None:
super().mouseMoveEvent(event)
return
@@ -690,15 +707,10 @@ class AnnotationCanvasWidget(QWidget):
if len(simplified) >= 2:
# Store polyline and redraw all annotations
self._add_polyline(
simplified, self.polyline_pen_color, self.polyline_pen_width
)
self._add_polyline(simplified, self.polyline_pen_color, self.polyline_pen_width)
# Convert to normalized coordinates for metadata + signal
normalized_stroke = [
self._image_to_normalized_coords(int(x), int(y))
for (x, y) in simplified
]
normalized_stroke = [self._image_to_normalized_coords(int(x), int(y)) for (x, y) in simplified]
self.all_strokes.append(
{
"points": normalized_stroke,
@@ -711,8 +723,7 @@ class AnnotationCanvasWidget(QWidget):
# Emit signal with normalized coordinates
self.annotation_drawn.emit(normalized_stroke)
logger.debug(
f"Completed stroke with {len(simplified)} points "
f"(normalized len={len(normalized_stroke)})"
f"Completed stroke with {len(simplified)} points " f"(normalized len={len(normalized_stroke)})"
)
self.current_stroke = []
@@ -752,9 +763,7 @@ class AnnotationCanvasWidget(QWidget):
# Store polyline as [y_norm, x_norm] to match DB convention and
# the expectations of draw_saved_polyline().
normalized_polyline = [
[y / img_height, x / img_width] for (x, y) in polyline
]
normalized_polyline = [[y / img_height, x / img_width] for (x, y) in polyline]
logger.debug(
f"Polyline {idx}: {len(polyline)} points, "
@@ -774,7 +783,7 @@ class AnnotationCanvasWidget(QWidget):
self,
polyline: List[List[float]],
color: str,
width: int = 3,
width: int = 1,
annotation_id: Optional[int] = None,
):
"""
@@ -812,17 +821,13 @@ class AnnotationCanvasWidget(QWidget):
# Store and redraw using common pipeline
pen_color = QColor(color)
pen_color.setAlpha(128) # Add semi-transparency
pen_color.setAlpha(255) # Add semi-transparency
self._add_polyline(img_coords, pen_color, width, annotation_id=annotation_id)
# Store in all_strokes for consistency (uses normalized coordinates)
self.all_strokes.append(
{"points": polyline, "color": color, "alpha": 128, "width": width}
)
self.all_strokes.append({"points": polyline, "color": color, "alpha": 255, "width": width})
logger.debug(
f"Drew saved polyline with {len(polyline)} points in color {color}"
)
logger.debug(f"Drew saved polyline with {len(polyline)} points in color {color}")
def draw_saved_bbox(
self,
@@ -846,9 +851,7 @@ class AnnotationCanvasWidget(QWidget):
return
if len(bbox) != 4:
logger.warning(
f"Invalid bounding box format: expected 4 values, got {len(bbox)}"
)
logger.warning(f"Invalid bounding box format: expected 4 values, got {len(bbox)}")
return
# Convert normalized coordinates to image coordinates (for logging/debug)
@@ -869,15 +872,11 @@ class AnnotationCanvasWidget(QWidget):
# in _redraw_annotations() together with all polylines.
pen_color = QColor(color)
pen_color.setAlpha(128) # Add semi-transparency
self.bboxes.append(
[float(x_min_norm), float(y_min_norm), float(x_max_norm), float(y_max_norm)]
)
self.bboxes.append([float(x_min_norm), float(y_min_norm), float(x_max_norm), float(y_max_norm)])
self.bbox_meta.append({"color": pen_color, "width": int(width), "label": label})
# Store in all_strokes for consistency
self.all_strokes.append(
{"bbox": bbox, "color": color, "alpha": 128, "width": width, "label": label}
)
self.all_strokes.append({"bbox": bbox, "color": color, "alpha": 128, "width": width, "label": label})
# Redraw overlay (polylines + all bounding boxes)
self._redraw_annotations()

View File

@@ -1,16 +1,21 @@
"""
YOLO model wrapper for the microscopy object detection application.
Provides a clean interface to YOLOv8 for training, validation, and inference.
"""YOLO model wrapper for the microscopy object detection application.
Notes on 16-bit TIFF support:
- Ultralytics training defaults assume 8-bit images and normalize by dividing by 255.
- This project can patch Ultralytics at runtime to decode TIFFs via `tifffile` and
normalize `uint16` correctly.
See [`apply_ultralytics_16bit_tiff_patches()`](src/utils/ultralytics_16bit_patch.py:1).
"""
from ultralytics import YOLO
from pathlib import Path
from typing import Optional, List, Dict, Callable, Any
import torch
import tempfile
import os
from src.utils.image import Image, convert_grayscale_to_rgb_preserve_range
from src.utils.image import Image
from src.utils.logger import get_logger
from src.utils.ultralytics_16bit_patch import apply_ultralytics_16bit_tiff_patches
logger = get_logger(__name__)
@@ -31,6 +36,9 @@ class YOLOWrapper:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"YOLOWrapper initialized with device: {self.device}")
# Apply Ultralytics runtime patches early (before first import/instantiation of YOLO datasets/trainers).
apply_ultralytics_16bit_tiff_patches()
def load_model(self) -> bool:
"""
Load YOLO model from path.
@@ -40,6 +48,9 @@ class YOLOWrapper:
"""
try:
logger.info(f"Loading YOLO model from {self.model_path}")
# Import YOLO lazily to ensure runtime patches are applied first.
from ultralytics import YOLO
self.model = YOLO(self.model_path)
self.model.to(self.device)
logger.info("Model loaded successfully")
@@ -85,9 +96,17 @@ class YOLOWrapper:
try:
logger.info(f"Starting training: {name}")
logger.info(
f"Data: {data_yaml}, Epochs: {epochs}, Batch: {batch}, ImgSz: {imgsz}"
)
logger.info(f"Data: {data_yaml}, Epochs: {epochs}, Batch: {batch}, ImgSz: {imgsz}")
# Defaults for 16-bit safety: disable augmentations that force uint8 and HSV ops that assume 0..255.
# Users can override by passing explicit kwargs.
kwargs.setdefault("mosaic", 0.0)
kwargs.setdefault("mixup", 0.0)
kwargs.setdefault("cutmix", 0.0)
kwargs.setdefault("copy_paste", 0.0)
kwargs.setdefault("hsv_h", 0.0)
kwargs.setdefault("hsv_s", 0.0)
kwargs.setdefault("hsv_v", 0.0)
# Train the model
results = self.model.train(
@@ -128,9 +147,7 @@ class YOLOWrapper:
try:
logger.info(f"Starting validation on {split} split")
results = self.model.val(
data=data_yaml, split=split, device=self.device, **kwargs
)
results = self.model.val(data=data_yaml, split=split, device=self.device, **kwargs)
logger.info("Validation completed successfully")
return self._format_validation_results(results)
@@ -169,17 +186,18 @@ class YOLOWrapper:
raise RuntimeError(f"Failed to load model from {self.model_path}")
prepared_source, cleanup_path = self._prepare_source(source)
imgsz = 1088
try:
logger.info(f"Running inference on {source}")
logger.info(f"Running inference on {source} -> prepared_source {prepared_source}")
results = self.model.predict(
source=prepared_source,
source=source,
conf=conf,
iou=iou,
save=save,
save_txt=save_txt,
save_conf=save_conf,
device=self.device,
imgsz=imgsz,
**kwargs,
)
@@ -195,13 +213,9 @@ class YOLOWrapper:
try:
os.remove(cleanup_path)
except OSError as cleanup_error:
logger.warning(
f"Failed to delete temporary RGB image {cleanup_path}: {cleanup_error}"
)
logger.warning(f"Failed to delete temporary RGB image {cleanup_path}: {cleanup_error}")
def export(
self, format: str = "onnx", output_path: Optional[str] = None, **kwargs
) -> str:
def export(self, format: str = "onnx", output_path: Optional[str] = None, **kwargs) -> str:
"""
Export model to different format.
@@ -236,21 +250,13 @@ class YOLOWrapper:
if source_path.is_file():
try:
img_obj = Image(source_path)
pil_img = img_obj.pil_image
if len(pil_img.getbands()) == 1:
rgb_img = convert_grayscale_to_rgb_preserve_range(pil_img)
else:
rgb_img = pil_img.convert("RGB")
suffix = source_path.suffix or ".png"
tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
tmp_path = tmp.name
tmp.close()
rgb_img.save(tmp_path)
img_obj.save(tmp_path)
cleanup_path = tmp_path
logger.info(
f"Converted image {source_path} to RGB for inference at {tmp_path}"
)
logger.info(f"Converted image {source_path} to RGB for inference at {tmp_path}")
return tmp_path, cleanup_path
except Exception as convert_error:
logger.warning(
@@ -263,9 +269,7 @@ class YOLOWrapper:
"""Format training results into dictionary."""
try:
# Get the results dict
results_dict = (
results.results_dict if hasattr(results, "results_dict") else {}
)
results_dict = results.results_dict if hasattr(results, "results_dict") else {}
formatted = {
"success": True,
@@ -298,9 +302,7 @@ class YOLOWrapper:
"mAP50-95": float(box_metrics.map),
"precision": float(box_metrics.mp),
"recall": float(box_metrics.mr),
"fitness": (
float(results.fitness) if hasattr(results, "fitness") else 0.0
),
"fitness": (float(results.fitness) if hasattr(results, "fitness") else 0.0),
}
# Add per-class metrics if available
@@ -310,11 +312,7 @@ class YOLOWrapper:
if idx < len(box_metrics.ap):
class_metrics[name] = {
"ap": float(box_metrics.ap[idx]),
"ap50": (
float(box_metrics.ap50[idx])
if hasattr(box_metrics, "ap50")
else 0.0
),
"ap50": (float(box_metrics.ap50[idx]) if hasattr(box_metrics, "ap50") else 0.0),
}
formatted["class_metrics"] = class_metrics
@@ -347,21 +345,15 @@ class YOLOWrapper:
"class_id": int(boxes.cls[i]),
"class_name": result.names[int(boxes.cls[i])],
"confidence": float(boxes.conf[i]),
"bbox_normalized": [
float(v) for v in xyxyn
], # [x_min, y_min, x_max, y_max]
"bbox_absolute": [
float(v) for v in boxes.xyxy[i].cpu().numpy()
], # Absolute pixels
"bbox_normalized": [float(v) for v in xyxyn], # [x_min, y_min, x_max, y_max]
"bbox_absolute": [float(v) for v in boxes.xyxy[i].cpu().numpy()], # Absolute pixels
}
# Extract segmentation mask if available
if has_masks:
try:
# Get the mask for this detection
mask_data = result.masks.xy[
i
] # Polygon coordinates in absolute pixels
mask_data = result.masks.xy[i] # Polygon coordinates in absolute pixels
# Convert to normalized coordinates
if len(mask_data) > 0:
@@ -374,9 +366,7 @@ class YOLOWrapper:
else:
detection["segmentation_mask"] = None
except Exception as mask_error:
logger.warning(
f"Error extracting mask for detection {i}: {mask_error}"
)
logger.warning(f"Error extracting mask for detection {i}: {mask_error}")
detection["segmentation_mask"] = None
else:
detection["segmentation_mask"] = None
@@ -390,9 +380,7 @@ class YOLOWrapper:
return []
@staticmethod
def convert_bbox_format(
bbox: List[float], format_from: str = "xywh", format_to: str = "xyxy"
) -> List[float]:
def convert_bbox_format(bbox: List[float], format_from: str = "xywh", format_to: str = "xyxy") -> List[float]:
"""
Convert bounding box between formats.

View File

@@ -54,7 +54,7 @@ class ConfigManager:
"models_directory": "data/models",
"base_model_choices": [
"yolov8s-seg.pt",
"yolov11s-seg.pt",
"yolo11s-seg.pt",
],
},
"training": {
@@ -225,6 +225,4 @@ class ConfigManager:
def get_allowed_extensions(self) -> list:
"""Get list of allowed image file extensions."""
return self.get(
"image_repository.allowed_extensions", Image.SUPPORTED_EXTENSIONS
)
return self.get("image_repository.allowed_extensions", Image.SUPPORTED_EXTENSIONS)

View File

@@ -6,16 +6,55 @@ import cv2
import numpy as np
from pathlib import Path
from typing import Optional, Tuple, Union
from PIL import Image as PILImage
from src.utils.logger import get_logger
from src.utils.file_utils import validate_file_path, is_image_file
from PySide6.QtGui import QImage
from tifffile import imread, imwrite
logger = get_logger(__name__)
def get_pseudo_rgb(arr: np.ndarray, gamma: float = 0.5) -> np.ndarray:
"""
Convert a grayscale image to a pseudo-RGB image using a gamma correction.
Args:
arr: Input grayscale image as numpy array
Returns:
Pseudo-RGB image as numpy array
"""
if arr.ndim != 2:
raise ValueError("Input array must be a grayscale image with shape (H, W)")
a1 = arr.copy().astype(np.float32)
a1 -= np.percentile(a1, 2)
a1[a1 < 0] = 0
p999 = np.percentile(a1, 99.9)
a1[a1 > p999] = p999
a1 /= a1.max()
if 1:
a2 = a1.copy()
a2 = a2**gamma
a2 /= a2.max()
a3 = a1.copy()
p9999 = np.percentile(a3, 99.99)
a3[a3 > p9999] = p9999
a3 /= a3.max()
# return np.stack([a1, np.zeros(a1.shape), np.zeros(a1.shape)], axis=0)
# return np.stack([a2, np.zeros(a1.shape), np.zeros(a1.shape)], axis=0)
out = np.stack([a1, a2, a3], axis=0)
# print(any(np.isnan(out).flatten()))
return out
class ImageLoadError(Exception):
"""Exception raised when an image cannot be loaded."""
@@ -54,7 +93,6 @@ class Image:
"""
self.path = Path(image_path)
self._data: Optional[np.ndarray] = None
self._pil_image: Optional[PILImage.Image] = None
self._width: int = 0
self._height: int = 0
self._channels: int = 0
@@ -80,40 +118,39 @@ class Image:
if not is_image_file(str(self.path), self.SUPPORTED_EXTENSIONS):
ext = self.path.suffix.lower()
raise ImageLoadError(
f"Unsupported image format: {ext}. "
f"Supported formats: {', '.join(self.SUPPORTED_EXTENSIONS)}"
f"Unsupported image format: {ext}. " f"Supported formats: {', '.join(self.SUPPORTED_EXTENSIONS)}"
)
try:
# Load with OpenCV (returns BGR format)
self._data = cv2.imread(str(self.path), cv2.IMREAD_UNCHANGED)
if self.path.suffix.lower() in [".tif", ".tiff"]:
self._data = imread(str(self.path))
else:
# raise NotImplementedError("RGB is not implemented")
# Load with OpenCV (returns BGR format)
self._data = cv2.imread(str(self.path), cv2.IMREAD_UNCHANGED)
if self._data is None:
raise ImageLoadError(f"Failed to load image with OpenCV: {self.path}")
# Extract metadata
self._height, self._width = self._data.shape[:2]
self._channels = self._data.shape[2] if len(self._data.shape) == 3 else 1
# print(self._data.shape)
if len(self._data.shape) == 2:
self._height, self._width = self._data.shape[:2]
self._channels = 1
else:
self._height, self._width = self._data.shape[1:]
self._channels = self._data.shape[0]
# self._channels = self._data.shape[2] if len(self._data.shape) == 3 else 1
self._format = self.path.suffix.lower().lstrip(".")
self._size_bytes = self.path.stat().st_size
self._dtype = self._data.dtype
# Load PIL version for compatibility (convert BGR to RGB)
if self._channels == 3:
rgb_data = cv2.cvtColor(self._data, cv2.COLOR_BGR2RGB)
self._pil_image = PILImage.fromarray(rgb_data)
elif self._channels == 4:
rgba_data = cv2.cvtColor(self._data, cv2.COLOR_BGRA2RGBA)
self._pil_image = PILImage.fromarray(rgba_data)
else:
# Grayscale
self._pil_image = PILImage.fromarray(self._data)
logger.info(
f"Successfully loaded image: {self.path.name} "
f"({self._width}x{self._height}, {self._channels} channels, "
f"{self._format.upper()})"
)
if 0:
logger.info(
f"Successfully loaded image: {self.path.name} "
f"({self._width}x{self._height}, {self._channels} channels, "
f"{self._format.upper()})"
)
except Exception as e:
logger.error(f"Error loading image {self.path}: {e}")
@@ -131,18 +168,6 @@ class Image:
raise ImageLoadError("Image data not available")
return self._data
@property
def pil_image(self) -> PILImage.Image:
"""
Get image data as PIL Image (RGB or grayscale).
Returns:
PIL Image object
"""
if self._pil_image is None:
raise ImageLoadError("PIL image not available")
return self._pil_image
@property
def width(self) -> int:
"""Get image width in pixels."""
@@ -187,6 +212,7 @@ class Image:
@property
def dtype(self) -> np.dtype:
"""Get the data type of the image array."""
if self._dtype is None:
raise ImageLoadError("Image dtype not available")
return self._dtype
@@ -206,8 +232,10 @@ class Image:
elif self._channels == 1:
if self._dtype == np.uint16:
return QImage.Format_Grayscale16
else:
elif self._dtype == np.uint8:
return QImage.Format_Grayscale8
elif self._dtype == np.float32:
return QImage.Format_BGR30
else:
raise ImageLoadError(f"Unsupported number of channels: {self._channels}")
@@ -218,12 +246,36 @@ class Image:
Returns:
Image data in RGB format as numpy array
"""
if self._channels == 3:
return cv2.cvtColor(self._data, cv2.COLOR_BGR2RGB)
if self.channels == 1:
img = get_pseudo_rgb(self.data)
self._dtype = img.dtype
return img, True
elif self._channels == 3:
return cv2.cvtColor(self._data, cv2.COLOR_BGR2RGB), False
elif self._channels == 4:
return cv2.cvtColor(self._data, cv2.COLOR_BGRA2RGBA)
return cv2.cvtColor(self._data, cv2.COLOR_BGRA2RGBA), False
else:
return self._data
raise NotImplementedError
# else:
# return self._data
def get_qt_rgb(self) -> np.ascontiguousarray:
# we keep data as (C, H, W)
_img, pseudo = self.get_rgb()
if pseudo:
img = np.zeros((self.height, self.width, 4), dtype=np.float32)
img[..., 0] = _img[0] # R gradient
img[..., 1] = _img[1] # G gradient
img[..., 2] = _img[2] # B constant
img[..., 3] = 1.0 # A = 1.0 (opaque)
return np.ascontiguousarray(img)
else:
return np.ascontiguousarray(_img)
def get_grayscale(self) -> np.ndarray:
"""
@@ -277,11 +329,26 @@ class Image:
"""
return self._channels >= 3
def save(self, path: Union[str, Path], pseudo_rgb: bool = True) -> None:
if self.channels == 1:
if pseudo_rgb:
img = get_pseudo_rgb(self.data)
print("Image.save", img.shape)
else:
img = np.repeat(self.data, 3, axis=2)
else:
raise NotImplementedError("Only grayscale images are supported for now.")
imwrite(path, data=img)
def __repr__(self) -> str:
"""String representation of the Image object."""
return (
f"Image(path='{self.path.name}', "
f"shape=({self._width}x{self._height}x{self._channels}), "
# Display as HxWxC to match the conventional NumPy shape semantics.
f"shape=({self._height}x{self._width}x{self._channels}), "
f"format={self._format}, "
f"size={self.size_mb:.2f}MB)"
)
@@ -291,38 +358,13 @@ class Image:
return self.__repr__()
def convert_grayscale_to_rgb_preserve_range(
pil_image: PILImage.Image,
) -> PILImage.Image:
"""Convert a single-channel PIL image to RGB while preserving dynamic range.
if __name__ == "__main__":
import argparse
Args:
pil_image: Single-channel PIL image (e.g., 16-bit grayscale).
parser = argparse.ArgumentParser()
parser.add_argument("--path", type=str, required=True)
args = parser.parse_args()
Returns:
PIL Image in RGB mode with intensities normalized to 0-255.
"""
if pil_image.mode == "RGB":
return pil_image
grayscale = np.array(pil_image)
if grayscale.ndim == 3:
grayscale = grayscale[:, :, 0]
original_dtype = grayscale.dtype
grayscale = grayscale.astype(np.float32)
if grayscale.size == 0:
return PILImage.new("RGB", pil_image.size, color=(0, 0, 0))
if np.issubdtype(original_dtype, np.integer):
denom = float(max(np.iinfo(original_dtype).max, 1))
else:
max_val = float(grayscale.max())
denom = max(max_val, 1.0)
grayscale = np.clip(grayscale / denom, 0.0, 1.0)
grayscale_u8 = (grayscale * 255.0).round().astype(np.uint8)
rgb_arr = np.repeat(grayscale_u8[:, :, None], 3, axis=2)
return PILImage.fromarray(rgb_arr, mode="RGB")
img = Image(args.path)
img.save(args.path + "test.tif")
print(img)

View File

@@ -12,23 +12,38 @@ class UT:
Operetta files along with rois drawn in ImageJ
"""
def __init__(self, roifile_fn: Path):
def __init__(self, roifile_fn: Path, no_labels: bool):
self.roifile_fn = roifile_fn
self.rois = ImagejRoi.fromfile(self.roifile_fn)
self.stem = self.roifile_fn.stem.strip("-RoiSet")
print("is file", self.roifile_fn.is_file())
self.rois = None
if no_labels:
self.rois = ImagejRoi.fromfile(self.roifile_fn)
print(self.roifile_fn.stem)
print(self.roifile_fn.parent.parts[-1])
if "Roi-" in self.roifile_fn.stem:
self.stem = self.roifile_fn.stem.split("Roi-")[1]
else:
self.stem = self.roifile_fn.parent.parts[-1]
else:
self.roifile_fn = roifile_fn / roifile_fn.parts[-1]
self.stem = self.roifile_fn.stem
print(self.roifile_fn)
print(self.stem)
self.image, self.image_props = self._load_images()
def _load_images(self):
"""Loading sequence of tif files
array sequence is CZYX
"""
print(self.roifile_fn.parent, self.stem)
fns = list(self.roifile_fn.parent.glob(f"{self.stem}*.tif*"))
print("Loading images:", self.roifile_fn.parent, self.stem)
fns = list(self.roifile_fn.parent.glob(f"{self.stem.lower()}*.tif*"))
stems = [fn.stem.split(self.stem)[-1] for fn in fns]
n_ch = len(set([stem.split("-ch")[-1].split("t")[0] for stem in stems]))
n_p = len(set([stem.split("-")[0] for stem in stems]))
n_t = len(set([stem.split("t")[1] for stem in stems]))
print(n_ch, n_p, n_t)
with TiffFile(fns[0]) as tif:
img = tif.asarray()
@@ -42,6 +57,7 @@ class UT:
"height": h,
"dtype": dtype,
}
print("Image props", self.image_props)
image_stack = np.zeros((n_ch, n_p, w, h), dtype=dtype)
for fn in fns:
@@ -49,7 +65,7 @@ class UT:
img = tif.asarray()
stem = fn.stem.split(self.stem)[-1]
ch = int(stem.split("-ch")[-1].split("t")[0])
p = int(stem.split("-")[0].lstrip("p"))
p = int(stem.split("-")[0].split("p")[1])
t = int(stem.split("t")[1])
print(fn.stem, "ch", ch, "p", p, "t", t)
image_stack[ch - 1, p - 1] = img
@@ -82,11 +98,20 @@ class UT:
):
"""Export rois to a file"""
with open(path / subfolder / f"{self.stem}.txt", "w") as f:
for roi in self.rois:
# TODO add image coordinates normalization
coords = ""
for x, y in roi.subpixel_coordinates:
coords += f"{x/self.width} {y/self.height}"
for i, roi in enumerate(self.rois):
rc = roi.subpixel_coordinates
if rc is None:
print(f"No coordinates: {self.roifile_fn}, element {i}, out of {len(self.rois)}")
continue
xmn, ymn = rc.min(axis=0)
xmx, ymx = rc.max(axis=0)
xc = (xmn + xmx) / 2
yc = (ymn + ymx) / 2
bw = xmx - xmn
bh = ymx - ymn
coords = f"{xc/self.width} {yc/self.height} {bw/self.width} {bh/self.height} "
for x, y in rc:
coords += f"{x/self.width} {y/self.height} "
f.write(f"{class_index} {coords}\n")
return
@@ -104,6 +129,7 @@ class UT:
self.image = np.max(self.image[channel], axis=0)
print(self.image.shape)
print(path / subfolder / f"{self.stem}.tif")
with TiffWriter(path / subfolder / f"{self.stem}.tif") as tif:
tif.write(self.image)
@@ -112,11 +138,31 @@ if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("input", type=Path)
parser.add_argument("output", type=Path)
parser.add_argument("-i", "--input", nargs="*", type=Path)
parser.add_argument("-o", "--output", type=Path)
parser.add_argument(
"--no-labels",
action="store_false",
help="Source does not have labels, export only images",
)
args = parser.parse_args()
for rfn in args.input.glob("*.zip"):
ut = UT(rfn)
ut.export_rois(args.output, class_index=0)
ut.export_image(args.output, plane_mode="max projection", channel=0)
# print(args)
# aa
for path in args.input:
print("Path:", path)
if not args.no_labels:
print("No labels")
ut = UT(path, args.no_labels)
ut.export_image(args.output, plane_mode="max projection", channel=0)
else:
for rfn in Path(path).glob("*.zip"):
# if Path(path).suffix == ".zip":
print("Roi FN:", rfn)
ut = UT(rfn, args.no_labels)
ut.export_rois(args.output, class_index=0)
ut.export_image(args.output, plane_mode="max projection", channel=0)
print()

368
src/utils/image_splitter.py Normal file
View File

@@ -0,0 +1,368 @@
import numpy as np
from pathlib import Path
from tifffile import imread, imwrite
from shapely.geometry import LineString
from copy import deepcopy
from scipy.ndimage import zoom
# debug
from src.utils.image import Image
from show_yolo_seg import draw_annotations
import pylab as plt
import cv2
class Label:
def __init__(self, yolo_annotation: str):
class_id, bbox, polygon = self.parse_yolo_annotation(yolo_annotation)
self.class_id = class_id
self.bbox = bbox
self.polygon = polygon
def parse_yolo_annotation(self, yolo_annotation: str):
class_id, *coords = yolo_annotation.split()
class_id = int(class_id)
bbox = np.array(coords[:4], dtype=np.float32)
polygon = np.array(coords[4:], dtype=np.float32).reshape(-1, 2) if len(coords) > 4 else None
if not any(np.isclose(polygon[0], polygon[-1])):
polygon = np.vstack([polygon, polygon[0]])
return class_id, bbox, polygon
def offset_label(
self,
img_w,
img_h,
distance: float = 1.0,
cap_style: int = 2,
join_style: int = 2,
):
if self.polygon is None:
self.bbox = np.array(
[
self.bbox[0] - distance if self.bbox[0] - distance > 0 else 0,
self.bbox[1] - distance if self.bbox[1] - distance > 0 else 0,
self.bbox[2] + distance if self.bbox[2] + distance < 1 else 1,
self.bbox[3] + distance if self.bbox[3] + distance < 1 else 1,
],
dtype=np.float32,
)
return self.bbox
def coords_are_normalized(coords):
# If every coordinate is between 0 and 1 (inclusive-ish), assume normalized
print(coords)
# if not coords:
# return False
return all(max(coords.flatten)) <= 1.001
def poly_to_pts(coords, img_w, img_h):
# coords: [x1 y1 x2 y2 ...] either normalized or absolute
# if coords_are_normalized(coords):
coords = [coords[i] * (img_w if i % 2 == 0 else img_h) for i in range(len(coords))]
pts = np.array(coords, dtype=np.int32).reshape(-1, 2)
return pts
pts = poly_to_pts(self.polygon, img_w, img_h)
line = LineString(pts)
# Buffer distance in pixels
buffered = line.buffer(distance=distance, cap_style=cap_style, join_style=join_style)
self.polygon = np.array(buffered.exterior.coords, dtype=np.float32) / (img_w, img_h)
xmn, ymn = self.polygon.min(axis=0)
xmx, ymx = self.polygon.max(axis=0)
xc = (xmn + xmx) / 2
yc = (ymn + ymx) / 2
bw = xmx - xmn
bh = ymx - ymn
self.bbox = np.array([xc, yc, bw, bh], dtype=np.float32)
return self.bbox, self.polygon
def translate(self, x, y, scale_x, scale_y):
self.bbox[0] -= x
self.bbox[0] *= scale_x
self.bbox[1] -= y
self.bbox[1] *= scale_y
self.bbox[2] *= scale_x
self.bbox[3] *= scale_y
if self.polygon is not None:
self.polygon[:, 0] -= x
self.polygon[:, 0] *= scale_x
self.polygon[:, 1] -= y
self.polygon[:, 1] *= scale_y
def in_range(self, hrange, wrange):
xc, yc, h, w = self.bbox
x1 = xc - w / 2
y1 = yc - h / 2
x2 = xc + w / 2
y2 = yc + h / 2
truth_val = (
xc >= wrange[0]
and x1 <= wrange[1]
and x2 >= wrange[0]
and x2 <= wrange[1]
and y1 >= hrange[0]
and y1 <= hrange[1]
and y2 >= hrange[0]
and y2 <= hrange[1]
)
print(x1, x2, wrange, y1, y2, hrange, truth_val)
return truth_val
def to_string(self, bbox: list = None, polygon: list = None):
coords = ""
if bbox is None:
bbox = self.bbox
# coords += " ".join([f"{x:.6f}" for x in self.bbox])
if polygon is None:
polygon = self.polygon
if self.polygon is not None:
coords += " " + " ".join([f"{x:.6f} {y:.6f}" for x, y in self.polygon])
return f"{self.class_id} {coords}"
def __str__(self):
return f"Class: {self.class_id}, BBox: {self.bbox}, Polygon: {self.polygon}"
class YoloLabelReader:
def __init__(self, label_path: Path):
self.label_path = label_path
self.labels = self._read_labels()
def _read_labels(self):
with open(self.label_path, "r") as f:
labels = [Label(line) for line in f.readlines()]
return labels
def get_labels(self, hrange, wrange):
"""hrange and wrange are tuples of (start, end) normalized to [0, 1]"""
labels = []
# print(hrange, wrange)
for lbl in self.labels:
# print(lbl)
if lbl.in_range(hrange, wrange):
labels.append(lbl)
return labels if len(labels) > 0 else None
def __get_item__(self, index):
return self.labels[index]
def __len__(self):
return len(self.labels)
def __iter__(self):
return iter(self.labels)
class ImageSplitter:
def __init__(self, image_path: Path, label_path: Path):
self.image = imread(image_path)
self.image_path = image_path
self.label_path = label_path
if not label_path.exists():
print(f"Label file {label_path} not found")
self.labels = None
else:
self.labels = YoloLabelReader(label_path)
def split_into_tiles(self, patch_size: tuple = (2, 2)):
"""Split image into patches of size patch_size"""
hstep, wstep = (
self.image.shape[0] // patch_size[0],
self.image.shape[1] // patch_size[1],
)
h, w = self.image.shape[:2]
for i in range(patch_size[0]):
for j in range(patch_size[1]):
metadata = {
"image_path": str(self.image_path),
"label_path": str(self.label_path),
"tile_section": f"{i}, {j}",
"tile_size": f"{hstep}, {wstep}",
"patch_size": f"{patch_size[0]}, {patch_size[1]}",
}
tile_reference = f"i{i}j{j}"
hrange = (i * hstep / h, (i + 1) * hstep / h)
wrange = (j * wstep / w, (j + 1) * wstep / w)
tile = self.image[i * hstep : (i + 1) * hstep, j * wstep : (j + 1) * wstep]
labels = None
if self.labels is not None:
labels = deepcopy(self.labels.get_labels(hrange, wrange))
print(id(labels))
if labels is not None:
print(hrange[0], wrange[0])
for l in labels:
print(l.bbox)
[l.translate(wrange[0], hrange[0], 2, 2) for l in labels]
print("translated")
for l in labels:
print(l.bbox)
# print(labels)
yield tile_reference, tile, labels, metadata
def split_respective_to_label(self, padding: int = 67):
if self.labels is None:
raise ValueError("No labels found. Only images having labels can be split.")
for i, label in enumerate(self.labels):
tile_reference = f"_lbl-{i+1:02d}"
# print(label.bbox)
metadata = {"image_path": str(self.image_path), "label_path": str(self.label_path), "label_index": str(i)}
xc_norm, yc_norm, h_norm, w_norm = label.bbox # normalized coords
xc, yc, h, w = [
int(np.round(f))
for f in [
xc_norm * self.image.shape[1],
yc_norm * self.image.shape[0],
h_norm * self.image.shape[0],
w_norm * self.image.shape[1],
]
] # image coords
# print("img coords:", xc, yc, h, w)
pad_xneg = padding + 1 # int(w / 2) + padding
pad_xpos = padding # int(w / 2) + padding
pad_yneg = padding + 1 # int(h / 2) + padding
pad_ypos = padding # int(h / 2) + padding
if xc - pad_xneg < 0:
pad_xneg = xc
if pad_xpos + xc > self.image.shape[1]:
pad_xpos = self.image.shape[1] - xc
if yc - pad_yneg < 0:
pad_yneg = yc
if pad_ypos + yc > self.image.shape[0]:
pad_ypos = self.image.shape[0] - yc
# print("pads:", pad_xneg, pad_xpos, pad_yneg, pad_ypos)
tile = self.image[
yc - pad_yneg : yc + pad_ypos,
xc - pad_xneg : xc + pad_xpos,
]
ny, nx = tile.shape
x_offset = pad_xneg
y_offset = pad_yneg
# print("tile shape:", tile.shape)
yolo_annotation = f"{label.class_id} " # {x_offset/nx} {y_offset/ny} {h_norm} {w_norm} "
yolo_annotation += " ".join(
[
f"{(x*self.image.shape[1]-(xc - x_offset))/nx:.6f} {(y*self.image.shape[0]-(yc-y_offset))/ny:.6f}"
for x, y in label.polygon
]
)
print(yolo_annotation)
new_label = Label(yolo_annotation=yolo_annotation)
yield tile_reference, tile, [new_label], metadata
def main(args):
if args.output:
args.output.mkdir(exist_ok=True, parents=True)
(args.output / "images").mkdir(exist_ok=True)
(args.output / "images-zoomed").mkdir(exist_ok=True)
(args.output / "labels").mkdir(exist_ok=True)
for image_path in (args.input / "images").glob("*.tif"):
data = ImageSplitter(
image_path=image_path,
label_path=(args.input / "labels" / image_path.stem).with_suffix(".txt"),
)
if args.split_around_label:
data = data.split_respective_to_label(padding=args.padding)
else:
data = data.split_into_tiles(patch_size=args.patch_size)
for tile_reference, tile, labels, metadata in data:
print()
print(tile_reference, tile.shape, labels, metadata) # len(labels) if labels else None)
# { debug
debug = False
if debug:
plt.figure(figsize=(10, 10 * tile.shape[0] / tile.shape[1]))
if labels is None:
plt.imshow(tile, cmap="gray")
plt.axis("off")
plt.title(f"{image_path.name} ({tile_reference})")
plt.show()
continue
print(labels[0].bbox)
# Draw annotations
out = draw_annotations(
cv2.cvtColor((tile / tile.max() * 255).astype(np.uint8), cv2.COLOR_GRAY2BGR),
[l.to_string() for l in labels],
alpha=0.1,
)
# Convert BGR -> RGB for matplotlib display
out_rgb = cv2.cvtColor(out, cv2.COLOR_BGR2RGB)
plt.imshow(out_rgb)
plt.axis("off")
plt.title(f"{image_path.name} ({tile_reference})")
plt.show()
# } debug
if args.output:
# imwrite(args.output / "images" / f"{image_path.stem}_{tile_reference}.tif", tile, metadata=metadata)
scale = 5
tile_zoomed = zoom(tile, zoom=scale)
metadata["scale"] = scale
imwrite(
args.output / "images" / f"{image_path.stem}_{tile_reference}.tif",
tile_zoomed,
metadata=metadata,
imagej=True,
)
if labels is not None:
with open(args.output / "labels" / f"{image_path.stem}_{tile_reference}.txt", "w") as f:
for label in labels:
# label.offset_label(tile.shape[1], tile.shape[0])
f.write(label.to_string() + "\n")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", type=Path)
parser.add_argument("-o", "--output", type=Path)
parser.add_argument(
"-p",
"--patch-size",
nargs=2,
type=int,
default=[2, 2],
help="Number of patches along height and width, rows and columns, respectively",
)
parser.add_argument(
"-sal",
"--split-around-label",
action="store_true",
help="If enabled, the image will be split around the label and for each label, a separate image will be created.",
)
parser.add_argument(
"--padding",
type=int,
default=67,
help="Padding around the label when splitting around the label.",
)
args = parser.parse_args()
main(args)

1
src/utils/show_yolo_seg.py Symbolic link
View File

@@ -0,0 +1 @@
../../tests/show_yolo_seg.py

View File

@@ -0,0 +1,157 @@
"""Ultralytics runtime patches for 16-bit TIFF training.
Goals:
- Use `tifffile` to decode `.tif/.tiff` reliably (OpenCV can silently drop bit-depth depending on codec).
- Preserve 16-bit data through the dataloader as `uint16` tensors.
- Fix Ultralytics trainer normalization (default divides by 255) to scale `uint16` correctly.
- Avoid uint8-forcing augmentations by recommending/setting hyp values (handled by caller).
This module is intended to be imported/called **before** instantiating/using YOLO.
"""
from __future__ import annotations
from typing import Optional
from src.utils.logger import get_logger
logger = get_logger(__name__)
def apply_ultralytics_16bit_tiff_patches(*, force: bool = False) -> None:
"""Apply runtime monkey-patches to Ultralytics to better support 16-bit TIFFs.
This function is safe to call multiple times.
Args:
force: If True, re-apply patches even if already applied.
"""
# Import inside function to ensure patching occurs before YOLO model/dataset is created.
import os
import cv2
import numpy as np
# import tifffile
import torch
from src.utils.image import Image
from ultralytics.utils import patches as ul_patches
already_patched = getattr(ul_patches.imread, "__name__", "") == "tifffile_imread"
if already_patched and not force:
return
_original_imread = ul_patches.imread
def tifffile_imread(filename: str, flags: int = cv2.IMREAD_COLOR, pseudo_rgb: bool = True) -> Optional[np.ndarray]:
"""Replacement for [`ultralytics.utils.patches.imread()`](venv/lib/python3.12/site-packages/ultralytics/utils/patches.py:20).
- For `.tif/.tiff`, uses `tifffile.imread()` and preserves dtype (e.g. uint16).
- For other formats, falls back to Ultralytics' original implementation.
- Always returns HWC (3 dims). For grayscale, returns (H, W, 1) or (H, W, 3) depending on requested flags.
"""
# print("here")
# return _original_imread(filename, flags)
ext = os.path.splitext(filename)[1].lower()
if ext in (".tif", ".tiff"):
arr = Image(filename).get_qt_rgb()[:, :, :3]
# Normalize common shapes:
# - (H, W) -> (H, W, 1)
# - (C, H, W) -> (H, W, C) (heuristic)
if arr is None:
return None
if arr.ndim == 3 and arr.shape[0] in (1, 3, 4) and arr.shape[0] < arr.shape[1]:
arr = np.transpose(arr, (1, 2, 0))
if arr.ndim == 2:
arr = arr[..., None]
# Ensure contiguous array for downstream OpenCV ops.
# logger.info(f"Loading with monkey-patched imread: {filename}")
arr = arr.astype(np.float32)
arr /= arr.max()
arr *= 2**8 - 1
arr = arr.astype(np.uint8)
# print(arr.shape, arr.dtype, any(np.isnan(arr).flatten()), np.where(np.isnan(arr)), arr.min(), arr.max())
return np.ascontiguousarray(arr)
# logger.info(f"Loading with original imread: {filename}")
return _original_imread(filename, flags)
# Patch the canonical reference.
ul_patches.imread = tifffile_imread
# Patch common module-level imports (some Ultralytics modules do `from ... import imread`).
# Importing these modules is safe and helps ensure the patched function is used.
try:
import ultralytics.data.base as _ul_base
_ul_base.imread = tifffile_imread
except Exception:
pass
try:
import ultralytics.data.loaders as _ul_loaders
_ul_loaders.imread = tifffile_imread
except Exception:
pass
# Patch trainer normalization: default divides by 255 regardless of input dtype.
from ultralytics.models.yolo.detect import train as detect_train
_orig_preprocess_batch = detect_train.DetectionTrainer.preprocess_batch
def preprocess_batch_16bit(self, batch: dict) -> dict: # type: ignore[override]
# Start from upstream behavior to keep device placement + multiscale identical,
# but replace the 255 division with dtype-aware scaling.
# logger.info(f"Preprocessing batch with monkey-patched preprocess_batch")
for k, v in batch.items():
if isinstance(v, torch.Tensor):
batch[k] = v.to(self.device, non_blocking=self.device.type == "cuda")
img = batch.get("img")
if isinstance(img, torch.Tensor):
# Decide scaling denom based on dtype (avoid expensive reductions if possible).
if img.dtype == torch.uint8:
denom = 255.0
elif img.dtype == torch.uint16:
denom = 65535.0
elif img.dtype.is_floating_point:
# Assume already in 0-1 range if float.
denom = 1.0
else:
# Generic integer fallback.
try:
denom = float(torch.iinfo(img.dtype).max)
except Exception:
denom = 255.0
batch["img"] = img.float() / denom
# Multi-scale branch copied from upstream to avoid re-introducing `/255` scaling.
if getattr(self.args, "multi_scale", False):
import math
import random
import torch.nn as nn
imgs = batch["img"]
sz = (
random.randrange(int(self.args.imgsz * 0.5), int(self.args.imgsz * 1.5 + self.stride))
// self.stride
* self.stride
)
sf = sz / max(imgs.shape[2:])
if sf != 1:
ns = [math.ceil(x * sf / self.stride) * self.stride for x in imgs.shape[2:]]
imgs = nn.functional.interpolate(imgs, size=ns, mode="bilinear", align_corners=False)
batch["img"] = imgs
return batch
detect_train.DetectionTrainer.preprocess_batch = preprocess_batch_16bit
# Tag function to make it easier to detect patch state.
setattr(detect_train.DetectionTrainer.preprocess_batch, "_ultralytics_16bit_patch", True)

226
tests/show_yolo_seg.py Normal file
View File

@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""
show_yolo_seg.py
Usage:
python show_yolo_seg.py /path/to/image.jpg /path/to/labels.txt
Supports:
- Segmentation polygons: "class x1 y1 x2 y2 ... xn yn"
- YOLO bbox lines as fallback: "class x_center y_center width height"
Coordinates can be normalized [0..1] or absolute pixels (auto-detected).
"""
import sys
import cv2
import numpy as np
import matplotlib.pyplot as plt
import argparse
from pathlib import Path
import random
from shapely.geometry import LineString
from src.utils.image import Image
def parse_label_line(line):
parts = line.strip().split()
if not parts:
return None
cls = int(float(parts[0]))
coords = [float(x) for x in parts[1:]]
return cls, coords
def coords_are_normalized(coords):
# If every coordinate is between 0 and 1 (inclusive-ish), assume normalized
if not coords:
return False
return max(coords) <= 1.001
def yolo_bbox_to_xyxy(coords, img_w, img_h):
# coords: [xc, yc, w, h] normalized or absolute
xc, yc, w, h = coords[:4]
if max(coords) <= 1.001:
xc *= img_w
yc *= img_h
w *= img_w
h *= img_h
x1 = int(round(xc - w / 2))
y1 = int(round(yc - h / 2))
x2 = int(round(xc + w / 2))
y2 = int(round(yc + h / 2))
return x1, y1, x2, y2
def poly_to_pts(coords, img_w, img_h):
# coords: [x1 y1 x2 y2 ...] either normalized or absolute
if coords_are_normalized(coords[4:]):
coords = [coords[i] * (img_w if i % 2 == 0 else img_h) for i in range(len(coords))]
pts = np.array(coords, dtype=np.int32).reshape(-1, 2)
return pts
def random_color_for_class(cls):
random.seed(cls) # deterministic per class
return (
0,
0,
255,
) # tuple(int(x) for x in np.array([random.randint(0, 255) for _ in range(3)]))
def draw_annotations(img, labels, alpha=0.4, draw_bbox_for_poly=True):
# img: BGR numpy array
overlay = img.copy()
h, w = img.shape[:2]
for line in labels:
if isinstance(line, str):
cls, coords = parse_label_line(line)
if isinstance(line, tuple):
cls, coords = line
if not coords:
continue
# polygon case (>=6 coordinates)
if len(coords) >= 6:
color = random_color_for_class(cls)
x1, y1, x2, y2 = yolo_bbox_to_xyxy(coords[:4], w, h)
print(x1, y1, x2, y2)
cv2.rectangle(img, (x1, y1), (x2, y2), color, 1)
pts = poly_to_pts(coords[4:], w, h)
# line = LineString(pts)
# # Buffer distance in pixels
# buffered = line.buffer(3, cap_style=2, join_style=2)
# coords = np.array(buffered.exterior.coords, dtype=np.int32)
# cv2.fillPoly(overlay, [coords], color=(255, 255, 255))
# fill on overlay
cv2.fillPoly(overlay, [pts], color)
# outline on base image
cv2.polylines(img, [pts], isClosed=True, color=color, thickness=1)
# put class text at first point
x, y = int(pts[0, 0]), int(pts[0, 1]) - 6
if 0:
cv2.putText(
img,
str(cls),
(x, max(6, y)),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
2,
cv2.LINE_AA,
)
# YOLO bbox case (4 coords)
elif len(coords) == 4:
x1, y1, x2, y2 = yolo_bbox_to_xyxy(coords, w, h)
color = random_color_for_class(cls)
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
cv2.putText(
img,
str(cls),
(x1, max(6, y1 - 4)),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
2,
cv2.LINE_AA,
)
else:
# Unknown / invalid format, skip
continue
# blend overlay for filled polygons
cv2.addWeighted(overlay, alpha, img, 1 - alpha, 0, img)
return img
def load_labels_file(label_path):
labels = []
with open(label_path, "r") as f:
for raw in f:
line = raw.strip()
if not line:
continue
parsed = parse_label_line(line)
if parsed:
labels.append(parsed)
return labels
def main():
parser = argparse.ArgumentParser(description="Show YOLO segmentation / polygon annotations")
parser.add_argument("image", type=str, help="Path to image file")
parser.add_argument("--labels", type=str, help="Path to YOLO label file (polygons)")
parser.add_argument("--alpha", type=float, default=0.4, help="Polygon fill alpha (0..1)")
parser.add_argument("--no-bbox", action="store_true", help="Don't draw bounding boxes for polygons")
args = parser.parse_args()
print(args)
img_path = Path(args.image)
if args.labels:
lbl_path = Path(args.labels)
else:
lbl_path = img_path.with_suffix(".txt")
lbl_path = Path(str(lbl_path).replace("images", "labels"))
if not img_path.exists():
print("Image not found:", img_path)
sys.exit(1)
if not lbl_path.exists():
print("Label file not found:", lbl_path)
sys.exit(1)
# img = cv2.imread(str(img_path), cv2.IMREAD_COLOR)
img = (Image(img_path).get_qt_rgb() * 255).astype(np.uint8)
if img is None:
print("Could not load image:", img_path)
sys.exit(1)
labels = load_labels_file(str(lbl_path))
if not labels:
print("No labels parsed from", lbl_path)
# continue and just show image
out = draw_annotations(img.copy(), labels, alpha=args.alpha, draw_bbox_for_poly=(not args.no_bbox))
lclass, coords = labels[0]
print(lclass, coords)
bbox = coords[:4]
print("bbox", bbox)
bbox = np.array(bbox) * np.array([img.shape[1], img.shape[0], img.shape[1], img.shape[0]])
yc, xc, h, w = bbox
print("bbox", bbox)
# polyline = np.array(coords[4:]).reshape(-1, 2) * np.array([img.shape[1], img.shape[0]])
polyline = np.array(coords).reshape(-1, 2) * np.array([img.shape[1], img.shape[0]])
print("pl", coords[4:])
print("pl", polyline)
# Convert BGR -> RGB for matplotlib display
# out_rgb = cv2.cvtColor(out, cv2.COLOR_BGR2RGB)
out_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# out_rgb = Image()
plt.figure(figsize=(10, 10 * out.shape[0] / out.shape[1]))
plt.imshow(out_rgb)
plt.plot(polyline[:, 0], polyline[:, 1], "y", linewidth=2)
if 0:
plt.plot(
[yc - h / 2, yc - h / 2, yc + h / 2, yc + h / 2, yc - h / 2],
[xc - w / 2, xc + w / 2, xc + w / 2, xc - w / 2, xc - w / 2],
"r",
linewidth=2,
)
# plt.axis("off")
plt.title(f"{img_path.name} ({lbl_path.name})")
plt.show()
if __name__ == "__main__":
main()