Skip to content
Snippets Groups Projects
Commit b5d5c044 authored by Florian Bruhin's avatar Florian Bruhin
Browse files

Split into multiple files

parent 24209b7f
No related branches found
No related tags found
No related merge requests found
__pycache__
/textual.log
/.venv
# AutPy GUI commander
Install dependencies from `requirements.txt`, then run with `python -m commander.gui`.
from __future__ import annotations
import sys
from PyQt6.QtWidgets import QApplication
from commander.gui import mainwidget
if __name__ == '__main__':
app = QApplication(sys.argv)
main = mainwidget.MainWidget()
main.show()
app.exec()
from __future__ import annotations
import logging
from PyQt6.QtCore import pyqtSlot, pyqtSignal, QObject, QTimer, QProcess
from commander.gui import utils
class GitFetcher(QObject):
INTERVAL = 10 * 1000 # 10s
status_message = pyqtSignal(str)
updated = pyqtSignal()
def __init__(self, parent: QObject = None) -> None:
super().__init__(parent)
self.process = QProcess()
self.process.setProgram('git')
self.process.setArguments(['fetch'])
self.process.setWorkingDirectory(str(utils.SUBMISSIONS_REPO))
self.process.setReadChannel(QProcess.ProcessChannel.StandardError)
self.process.readyRead.connect(self.on_ready_read)
self.process.finished.connect(self.on_finished)
self.timer = QTimer()
self.timer.setInterval(self.INTERVAL)
self.timer.timeout.connect(self.on_timeout)
self._had_updates = False
def start(self):
self.on_timeout()
self.timer.start()
@pyqtSlot()
def on_timeout(self):
if self.process.state() != QProcess.ProcessState.NotRunning:
logging.info("Fetch still running...")
return
self._had_updates = False
self.status_message.emit("Fetching...")
self.process.start()
@pyqtSlot()
def on_finished(self):
self.status_message.emit(None)
if self._had_updates:
self.updated.emit()
self._had_updates = False
@pyqtSlot()
def on_ready_read(self):
while self.process.canReadLine():
self._had_updates = True
data = self.process.readLine().data()
logging.info(data.decode('utf-8').rstrip('\n'))
from __future__ import annotations
import logging
from PyQt6.QtWidgets import QWidget, QTextEdit
from PyQt6.QtGui import QTextCursor
class LogHandler(logging.Handler):
def __init__(self, text_edit: QTextEdit):
super().__init__()
self.text_edit = text_edit
formatter = logging.Formatter('%(asctime)s - %(message)s', datefmt='%H:%M:%S')
self.setFormatter(formatter)
def emit(self, record: logging.LogRecord) -> None:
self.text_edit.append_line(self.format(record))
class LogView(QTextEdit):
def __init__(self, parent: QWidget = None) -> None:
super().__init__(parent)
self.setReadOnly(True)
def append_line(self, log: str) -> None:
self.moveCursor(QTextCursor.MoveOperation.End)
self.insertPlainText(log + '\n')
self.ensureCursorVisible()
def init():
logging.basicConfig(level=logging.INFO)
log_view = LogView()
log_handler = LogHandler(log_view)
logging.getLogger().addHandler(log_handler)
return log_view
from __future__ import annotations
from PyQt6.QtWidgets import QApplication, QVBoxLayout, QSplitter, QStatusBar, QWidget
from PyQt6.QtGui import QKeySequence, QAction
from PyQt6.QtCore import Qt
from commander.gui import notebook, report, overview, log, selectors, git
class MainWidget(QWidget):
def __init__(self, parent: QWidget = None) -> None:
super().__init__(parent)
self.layout = QVBoxLayout(self)
self.hsplitter = QSplitter()
self.hsplitter.setHandleWidth(10)
self.layout.addWidget(self.hsplitter)
self.notebook = notebook.NotebookView()
self.hsplitter.addWidget(self.notebook)
self.report = report.ReportView()
self.hsplitter.addWidget(self.report)
# FIXME why is this needed?
width = self.hsplitter.width() // 2 - self.hsplitter.handleWidth()
self.hsplitter.setSizes([width, width])
self.vsplitter = QSplitter()
self.vsplitter.setHandleWidth(10)
self.vsplitter.setOrientation(Qt.Orientation.Vertical)
self.layout.addWidget(self.vsplitter)
self.vsplitter.addWidget(self.hsplitter)
self.overview = overview.OverviewView()
self.vsplitter.addWidget(self.overview)
self.log_view = log.init()
self.vsplitter.addWidget(self.log_view)
for idx, factor in [
(0, 4), # Notebook / Results
(1, 3), # Overview
(2, 1), # Log
]:
self.vsplitter.setStretchFactor(idx, factor)
self.statusbar = QStatusBar()
self.layout.addWidget(self.statusbar)
self.selectors = selectors.SelectorGrid()
self.statusbar.addPermanentWidget(self.selectors)
self.outcome_summary = selectors.OutcomeSummary()
self.statusbar.addPermanentWidget(self.outcome_summary)
self.overview.model().node_ids_loaded.connect(self.selectors.test.on_node_ids_loaded)
self.overview.model().tag_mapping_loaded.connect(self.notebook.on_tag_mapping_loaded)
self.overview.model().outcome_summary_loaded.connect(self.outcome_summary.on_outcome_summary_loaded)
self.overview.test_activated.connect(self.notebook.scroll_to_nodeid)
self.overview.test_activated.connect(self.report.scroll_to_nodeid)
self.selectors.folder.folder_changed.connect(self.notebook.on_folder_changed)
self.selectors.folder.folder_changed.connect(self.report.on_folder_changed)
self.selectors.folder.folder_changed.connect(self.overview.on_folder_changed)
self.selectors.branch.branch_changed.connect(self.selectors.folder.on_branch_changed)
self.selectors.test.test_changed.connect(self.overview.on_test_changed)
self.overview.test_activated.connect(self.selectors.test.on_test_activated)
QApplication.instance().focusChanged.connect(self.selectors.on_focus_changed)
self.selectors.open_defaults()
self.fetcher = git.GitFetcher(self)
self.fetcher.status_message.connect(self.statusbar.showMessage)
self.fetcher.updated.connect(self.selectors.folder.update_completer)
self.fetcher.updated.connect(self.selectors.branch.update_completer)
self.fetcher.start()
for key, widget in [
("1", self.notebook),
("2", self.report),
("3", self.overview),
("4", self.log_view),
]:
action = QAction(self)
action.setShortcut(QKeySequence(f"Ctrl+{key}"))
action.triggered.connect(widget.setFocus)
self.addAction(action)
from __future__ import annotations
import pathlib
import logging
import nbconvert
import nbformat
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWidgets import QWidget
from PyQt6.QtCore import pyqtSlot
class NotebookView(QWebEngineView):
def __init__(self, parent: QWidget = None) -> None:
super().__init__(parent)
self.nodeid_to_tag = {}
def _render(self, path: pathlib.Path) -> None:
with path.open('r') as f:
nb = nbformat.read(f, as_version=nbformat.NO_CONVERT)
exporter = nbconvert.HTMLExporter()
html, _resources = exporter.from_notebook_node(nb)
self.setHtml(html)
@pyqtSlot(pathlib.Path)
def on_folder_changed(self, path: pathlib.Path) -> None:
notebooks = list(path.glob('*.ipynb'))
assert len(notebooks) == 1, notebooks
self._render(notebooks[0])
@pyqtSlot(str)
def scroll_to_nodeid(self, nodeid: str) -> None:
tag = self.nodeid_to_tag.get(nodeid)
if tag is None:
logging.info(f"No tag found for {nodeid}")
self.page().runJavaScript('window.scrollTo(0, 0)')
return
self.page().runJavaScript(
f"document.getElementsByClassName('celltag_{tag}')[0].scrollIntoView()")
@pyqtSlot(dict)
def on_tag_mapping_loaded(self, mapping: dict[str, str]) -> None:
self.nodeid_to_tag = mapping
from __future__ import annotations
import json
import pathlib
import collections
from typing import Any
from PyQt6.QtWidgets import QWidget, QTableView
from PyQt6.QtGui import QColor
from PyQt6.QtCore import Qt, pyqtSlot, pyqtSignal, QModelIndex, QAbstractTableModel
from commander.gui import utils
class OverviewModel(QAbstractTableModel):
NodeIdRole = Qt.ItemDataRole.UserRole
_COLUMNS = ["nodeid", "tag", "when", "outcome", "message"]
node_ids_loaded = pyqtSignal(list)
tag_mapping_loaded = pyqtSignal(dict)
outcome_summary_loaded = pyqtSignal(dict)
def __init__(self, parent: QWidget = None) -> None:
super().__init__(parent)
self._reports = []
self._nodeid_to_tag = {}
def rowCount(self, parent: QModelIndex = None) -> int:
return len(self._reports)
def columnCount(self, parent: QModelIndex = None) -> int:
return len(self._COLUMNS)
def find_by_short_nodeid(self, nodeid: str) -> QModelIndex:
matching = [
i for i, r
in enumerate(self._reports)
if utils.shorten_nodeid(r['nodeid']) == nodeid
]
assert len(matching) == 1, matching
return self.index(matching[0], 0)
def _data_nodeid(self, report: dict[str, Any], *, short: bool) -> str:
nodeid = report['nodeid']
if not short:
return nodeid
return utils.shorten_nodeid(nodeid)
def _data_tag(self, report: dict[str, Any]) -> str | None:
return self._nodeid_to_tag[report['nodeid']]
def _data_when(self, report: dict[str, Any]) -> str:
return report['when']
def _data_outcome(self, report: dict[str, Any]) -> str:
if report['outcome'] == 'failed' and report['when'] == 'setup':
return 'error'
return report['outcome']
def _data_outcome_color(self, report: dict[str, Any]) -> QColor:
value = self._data_outcome(report)
return utils.OUTCOME_COLORS[value]
def _data_message(self, report: dict[str, Any]) -> str:
if report['outcome'] == 'passed':
return ''
msg = report['longrepr']['reprcrash']['message']
return msg.replace('\n', ' '*4)[:300]
def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole) -> Any:
key = self._COLUMNS[index.column()]
report = self._reports[index.row()]
if key == 'nodeid' and role == Qt.ItemDataRole.DisplayRole:
return self._data_nodeid(report, short=True)
elif key == 'nodeid' and role == self.NodeIdRole:
return self._data_nodeid(report, short=False)
elif key == 'tag' and role == Qt.ItemDataRole.DisplayRole:
return self._data_tag(report)
elif key == 'when' and role == Qt.ItemDataRole.DisplayRole:
return self._data_when(report)
elif key == 'outcome' and role == Qt.ItemDataRole.DisplayRole:
return self._data_outcome(report)
elif key == 'outcome' and role == Qt.ItemDataRole.ForegroundRole:
return self._data_outcome_color(report)
elif key == 'message' and role == Qt.ItemDataRole.DisplayRole:
return self._data_message(report)
return None
def headerData(self, section: int, orientation: Qt.Orientation, role: int = Qt.ItemDataRole.DisplayRole) -> Any:
if role != Qt.ItemDataRole.DisplayRole or orientation != Qt.Orientation.Horizontal:
return None
return self._COLUMNS[section]
def _read_celltag(self, report: dict[str, Any]) -> str | None:
props = dict(report['user_properties'])
if 'celltag' not in props:
return None
celltag = props['celltag']
if isinstance(celltag, list): # running multiple tags
return celltag[0]
return celltag
def _load_tag_mapping(self, reports: list[dict[str, Any]]) -> None:
self._nodeid_to_tag = {
rep['nodeid']: self._read_celltag(rep)
for rep in reports
}
with utils.TAG_MAPPING_FILE.open('r') as f:
from_json = json.load(f)
self._nodeid_to_tag.update(from_json)
self.tag_mapping_loaded.emit(self._nodeid_to_tag)
def load_reports(self, reports: list[dict[str, Any]]) -> None:
outcome_order = ['error', 'failed', 'passed']
filtered_reports = sorted((
rep
for rep in reports
if rep['$report_type'] == 'TestReport' and
(rep['when'] == 'call' or rep['outcome'] != 'passed')
), key=lambda rep: outcome_order.index(self._data_outcome(rep)))
self._load_tag_mapping(filtered_reports)
outcome_summary = collections.Counter([self._data_outcome(rep) for rep in filtered_reports])
self.outcome_summary_loaded.emit(dict(outcome_summary))
nodeids = [self._data_nodeid(rep, short=True) for rep in filtered_reports]
self.node_ids_loaded.emit(nodeids)
self.beginResetModel()
self._reports = filtered_reports
self.endResetModel()
class OverviewView(QTableView):
outcomes_loaded = pyqtSignal(dict)
test_activated = pyqtSignal(str)
def __init__(self, parent: QWidget = None) -> None:
super().__init__(parent)
self.setModel(OverviewModel(self))
self.activated.connect(self.on_activated)
@pyqtSlot(QModelIndex)
def on_activated(self, index: QModelIndex) -> None:
model = self.model()
nodeid = model.data(index, model.NodeIdRole)
self.test_activated.emit(nodeid)
@pyqtSlot(pathlib.Path)
def on_folder_changed(self, path: pathlib.Path) -> None:
self._render(path / 'report.json')
self.setFocus()
@pyqtSlot(str)
def on_test_changed(self, name: str) -> None:
model = self.model()
idx = model.find_by_short_nodeid(name)
self.selectRow(idx.row())
self.test_activated.emit(model.data(idx, model.NodeIdRole))
self.setFocus()
def _render(self, path: pathlib.Path) -> None:
reports = [
json.loads(line)
for line in path.read_text('utf-8').splitlines()
]
self.model().load_reports(reports)
self.resizeColumnsToContents()
from __future__ import annotations
import pathlib
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWidgets import QWidget, QTextEdit, QStackedLayout
from PyQt6.QtGui import QTextCursor, QTextDocument
from PyQt6.QtCore import QUrl, pyqtSlot
class ReportView(QWidget):
def __init__(self, parent: QWidget = None) -> None:
super().__init__(parent)
self._layout = QStackedLayout(self)
self._textedit = QTextEdit()
self._textedit.setReadOnly(True)
self._layout.addWidget(self._textedit)
self._webview = QWebEngineView()
self._layout.addWidget(self._webview)
@pyqtSlot(str)
def scroll_to_nodeid(self, nodeid: str) -> None:
if self._layout.currentWidget() is self._webview:
self._scroll_to_nodeid_html(nodeid)
elif self._layout.currentWidget() is self._textedit:
self._scroll_to_nodeid_md(nodeid)
else:
assert False, self._layout.currentWidget()
def _scroll_to_nodeid_md(self, nodeid: str) -> None:
escaped_id = nodeid.split('::')[1] + ' '
# Searching from end so that search result is at the top
self._textedit.moveCursor(QTextCursor.MoveOperation.End)
self._textedit.find(escaped_id, QTextDocument.FindFlag.FindBackward)
def _scroll_to_nodeid_html(self, nodeid: str) -> None:
nodeid = nodeid.replace(r'\n', " ")
self._webview.page().runJavaScript(
"""
for (let elem of document.getElementsByClassName('col-name')) {
if (elem.innerText.replace('::setup', '') == '%NODEID%') {
elem.scrollIntoView();
break;
}
}
""".replace('%NODEID%', nodeid))
@pyqtSlot(pathlib.Path)
def on_folder_changed(self, path: pathlib.Path) -> None:
md_path = path / 'report.md'
html_path = path / 'report.html'
if html_path.exists():
self._render_html(html_path)
else:
self._render_md(md_path)
def _render_md(self, path: pathlib.Path) -> None:
self._textedit.setMarkdown(path.read_text('utf-8'))
self._layout.setCurrentWidget(self._textedit)
self.setFocusProxy(self._textedit)
def _render_html(self, path: pathlib.Path) -> None:
self._webview.setUrl(QUrl.fromLocalFile(str(path)))
self._layout.setCurrentWidget(self._webview)
self.setFocusProxy(self._webview)
from __future__ import annotations
import pathlib
import subprocess
import logging
from typing import Any
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWidgets import QWidget, QTextEdit, QLineEdit, QCompleter, QLabel, QGridLayout
from PyQt6.QtGui import QKeySequence, QAction
from PyQt6.QtCore import Qt, pyqtSlot, pyqtSignal, QStringListModel
from commander.gui import utils
class Selector(QLineEdit):
SHORTCUT = None
FILTER_MODE = Qt.MatchFlag.MatchStartsWith
def __init__(self, parent: QWidget = None) -> None:
super().__init__(parent)
self._completer = QCompleter()
self._completer.setFilterMode(self.FILTER_MODE)
self.setCompleter(self._completer)
self.data = []
self.update_completer()
action = QAction(self)
action.setShortcut(self.SHORTCUT)
action.triggered.connect(self.on_triggered)
self.addAction(action)
self.editingFinished.connect(self.on_editing_finished)
def open_default(self):
self.setText(self._default_value())
self.editingFinished.emit()
def _get_data(self) -> list[str]:
raise NotImplementedError
def _default_value(self) -> str:
return self.data[0]
def accept_text(self, text: str) -> None:
raise NotImplementedError
@pyqtSlot()
def update_completer(self) -> None:
self.data = self._get_data()
self._completer.setModel(QStringListModel(self.data))
@pyqtSlot()
def on_triggered(self) -> None:
self.clear()
self.setFocus()
self._completer.setCompletionPrefix('')
self._completer.complete()
@pyqtSlot()
def on_editing_finished(self) -> None:
text = self.text()
if text not in self.data:
return
self.accept_text(text)
class FolderSelector(Selector):
folder_changed = pyqtSignal(pathlib.Path)
SHORTCUT = QKeySequence("Ctrl+F")
def _get_data(self) -> list[str]:
return [
str(p.relative_to(utils.SUBMISSIONS_REPO))
for p in utils.SUBMISSIONS_REPO.glob('[0-9]*/[0-9]*')
]
@pyqtSlot()
def on_branch_changed(self) -> None:
self.update_completer()
if self.text() in self.data:
self.accept_text(self.text())
else:
self.open_default()
def accept_text(self, text: str) -> None:
path = utils.SUBMISSIONS_REPO / text
self.folder_changed.emit(path)
class BranchSelector(Selector):
branch_changed = pyqtSignal(str)
SHORTCUT = QKeySequence("Ctrl+B")
def _run_git(self, *args: str, **kwargs: Any) -> subprocess.CompletedProcess:
return subprocess.run(
['git'] + list(args),
cwd=utils.SUBMISSIONS_REPO,
text=True,
check=True,
capture_output=True,
**kwargs)
def _get_git_branches(self) -> list[str]:
return self._run_git(
'for-each-ref',
'--format=%(refname:short)',
'--sort=-authordate',
'refs/remotes/origin').stdout.splitlines()
def _get_data(self) -> list[str]:
return [
branch.removeprefix('origin/')
for branch in self._get_git_branches()
if '.' in branch
]
def accept_text(self, text: str) -> None:
proc = self._run_git('checkout', f'origin/{text}')
logging.info(proc.stderr)
self.branch_changed.emit(text)
class TestSelector(Selector):
test_changed = pyqtSignal(str)
SHORTCUT = QKeySequence("Ctrl+T")
FILTER_MODE = Qt.MatchFlag.MatchContains
@pyqtSlot(list)
def on_node_ids_loaded(self, data: list[str]) -> None:
self.data = data
self.update_completer()
@pyqtSlot(str)
def on_test_activated(self, nodeid: str) -> None:
self.setText(utils.shorten_nodeid(nodeid))
def _get_data(self) -> list[str]:
return self.data # nop
def _default_value(self) -> str:
return ''
def accept_text(self, text: str) -> None:
self.test_changed.emit(text)
class OutcomeSummary(QLabel):
@pyqtSlot(dict)
def on_outcome_summary_loaded(self, summary: dict[str, int]) -> None:
order = ['passed', 'failed', 'error']
parts = []
for outcome, count in sorted(summary.items(), key=lambda item: order.index(item[0])):
color = utils.OUTCOME_COLORS[outcome]
parts.append(f"<font color='{color.name()}'>{count} {outcome}</font>")
grade = 5 * summary['passed'] / sum(summary.values()) + 1
parts.append(f'{round(grade, 2)}')
self.setText('<br>'.join(parts))
class SelectorGrid(QWidget):
def __init__(self, parent: QWidget = None) -> None:
super().__init__(parent)
layout = QGridLayout()
self.setLayout(layout)
layout.setContentsMargins(0, 0, 0, 0)
self.test = TestSelector()
self.folder = FolderSelector()
self.branch = BranchSelector()
self.focus = QLabel()
items = [
('<b>Focus</b> (Ctrl+1/2/3/4):', self.focus),
('<b>Test</b> (Ctrl+T):', self.test),
('<b>Folder</b> (Ctrl+F):', self.folder),
('<b>Branch</b> (Ctrl+B):', self.branch),
]
for col, (label, selector) in enumerate(items):
layout.addWidget(QLabel(label), 0, col)
layout.addWidget(selector, 1, col)
@pyqtSlot(QWidget, QWidget)
def on_focus_changed(self, old: QWidget, new: QWidget) -> None:
while type(new) in [QWidget, QWebEngineView, QTextEdit]:
# FIXME a bit of a hack...
new = new.parent()
self.focus.setText(new.__class__.__name__)
def open_defaults(self):
self.folder.open_default()
self.branch.open_default()
self.test.open_default()
import pathlib
from PyQt6.QtCore import Qt
from PyQt6.QtGui import QColor
REPO_DIR = pathlib.Path(__file__).parents[2]
SUBMISSIONS_REPO = REPO_DIR.parent / 'submissions'
TAG_MAPPING_FILE = REPO_DIR / 'nodeid-to-tag.json'
OUTCOME_COLORS = {
'passed': QColor(Qt.GlobalColor.darkGreen),
'failed': QColor(Qt.GlobalColor.red),
'error': QColor(Qt.GlobalColor.darkRed),
'skipped': QColor(Qt.GlobalColor.yellow),
}
def shorten_nodeid(nodeid: str) -> str:
path, name = nodeid.split('::')
return pathlib.PosixPath(path).name + '::' + name
File moved
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment