Source code for mdaviz.chartview

"""
Charting widget
"""

import datetime
from functools import partial
from itertools import cycle
import numpy
from PyQt5 import QtCore, QtWidgets
from . import utils

from matplotlib.figure import Figure
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar


FONTSIZE = 10
LEFT_BUTTON = 1
MIDDLE_BUTTON = 2
RIGHT_BUTTON = 3

# https://pyqtgraph.readthedocs.io/en/latest/api_reference/graphicsItems/scatterplotitem.html#pyqtgraph.ScatterPlotItem.setSymbol
# https://developer.mozilla.org/en-US/docs/Web/CSS/named-color
# Do NOT sort these colors alphabetically!  There should be obvious
# contrast between adjacent colors.
PLOT_COLORS = """
    r g b c m
    goldenrod
    lime
    orange
    blueviolet
    brown
    teal
    olive
    lightcoral
    cornflowerblue
    forestgreen
    salmon
""".split()
PLOT_SYMBOLS = """o + x star s d t t2 t3""".split()
_AUTO_COLOR_CYCLE = cycle(PLOT_COLORS)
_AUTO_SYMBOL_CYCLE = cycle(PLOT_SYMBOLS)


[docs] def auto_color(): """Returns next color for pens and brushes.""" return next(_AUTO_COLOR_CYCLE)
[docs] def auto_symbol(): """Returns next symbol for scatter plots.""" return next(_AUTO_SYMBOL_CYCLE)
[docs] class ChartView(QtWidgets.QWidget): """TODO: docstrings""" def __init__(self, parent, **kwargs): # parent=<mdaviz.mda_folder.MDA_MVC object at 0x10e7ff520> self.mda_mvc = parent super().__init__() ############# UI initialization: # Create a Matplotlib figure and canvas self.figure = Figure() self.canvas = FigureCanvas(self.figure) self.main_axes = self.figure.add_subplot(111) # Adjust margins self.figure.subplots_adjust(bottom=0.1, top=0.9, right=0.92) # Create the navigation toolbar self.toolbar = NavigationToolbar(self.canvas, self) # Use a QVBoxLayout for stacking the toolbar and canvas vertically layout = QtWidgets.QVBoxLayout(self) layout.addWidget(self.toolbar) layout.addWidget(self.canvas) # Apply the QVBoxLayout to the ChartView widget self.setLayout(layout) # Plot configuration plot_options = kwargs.get("plot_options", {}) self.setTitle(plot_options.get("title", "")) self.setXlabel(plot_options.get("y", "")) self.setYlabel(plot_options.get("x", "")) self.configPlot() ############# Signals & slots: # Track curves and display in QComboBox: self.plotObjects = {} # all the Line2D on the graph, key = curveID self.curveBox = self.mda_mvc.mda_file_viz.curveBox self.curveBox.currentTextChanged.connect(self.onCurveSelected) # Initialize CurveManager self.curveManager = CurveManager(self) self.curveManager.curveAdded.connect(self.onCurveAdded) self.curveManager.curveUpdated.connect(self.onCurveUpdated) self.curveManager.curveRemoved.connect(self.onCurveRemoved) self.curveManager.allCurvesRemoved.connect(self.onAllCurvesRemoved) # # Debug signals: # self.curveManager.curveAdded.connect(utils.debug_signal) # self.curveManager.curveRemoved.connect(utils.debug_signal) # self.curveManager.curveUpdated.connect(utils.debug_signal) # self.curveManager.allCurvesRemoved.connect(utils.debug_signal) # Remove buttons definitions: self.clearAll = self.mda_mvc.mda_file_viz.clearAll self.removeButton = self.mda_mvc.mda_file_viz.curveRemove self.removeCursor1 = self.mda_mvc.mda_file_viz.cursor1_remove self.removeCursor2 = self.mda_mvc.mda_file_viz.cursor2_remove # Remove button connections: utils.reconnect(self.clearAll.clicked, self.curveManager.allCurvesRemoved) utils.reconnect(self.removeButton.clicked, self.onRemoveButtonClicked) self.removeCursor1.clicked.connect(partial(self.onRemoveCursor, cursor_num=1)) self.removeCursor2.clicked.connect(partial(self.onRemoveCursor, cursor_num=2)) # File tableview & graph synchronization: self.mda_mvc.mda_file.tabManager.tabRemoved.connect(self.onTabRemoved) self.mda_mvc.detRemoved.connect(self.onDetRemoved) # self.mda_mvc.detRemoved.connect(utils.debug_signal) # Connect offset & factor QLineEdit: self.offset_value = self.mda_mvc.mda_file_viz.offset_value self.factor_value = self.mda_mvc.mda_file_viz.factor_value self.offset_value.editingFinished.connect(self.onOffsetUpdated) self.factor_value.editingFinished.connect(self.onFactorUpdated) # Connect the click event to a handler self.cid = self.figure.canvas.mpl_connect("button_press_event", self.onclick) self.cursors = { 1: None, "pos1": None, "text1": "middle click", 2: None, "pos2": None, "text2": "right click", "diff": "n/a", "midpoint": "n/a", } ########################################## Set & get methods: def setPlotTitle(self, text): self.main_axes.set_title(text, fontsize=FONTSIZE, y=1.03) def setBottomAxisText(self, text): self.main_axes.set_xlabel(text, fontsize=FONTSIZE, labelpad=10) def setLeftAxisText(self, text): self.main_axes.set_ylabel(text, fontsize=FONTSIZE, labelpad=20) def title(self): return self._title def xlabel(self): return self._xlabel def ylabel(self): return self._ylabel def setTitle(self, txt=""): self._title = txt def setXlabel(self, txt=""): self._xlabel = txt def setYlabel(self, txt=""): self._ylabel = txt def getSelectedCurveID(self): return self.curveBox.currentText() ########################################## Slot methods: def onCurveAdded(self, curveID): # Add to graph curveData = self.curveManager.getCurveData(curveID) ds = curveData["ds"] ds_options = curveData["ds_options"] # Plot and store the plot object associated with curveID: try: plot_obj = self.main_axes.plot(*ds, **ds_options)[0] self.plotObjects[curveID] = plot_obj except Exception as exc: print(str(exc)) # Update plot self.updatePlot(update_title=True) # Add to the comboBox index = self.curveBox.count() # Get the next index self.curveBox.addItem(curveID) file_path = curveData.get("file_path", "No file path available") self.curveBox.setItemData(index, file_path, QtCore.Qt.ToolTipRole) def onCurveUpdated(self, curveID, recompute_y=False, update_x=False): curve_data = self.curveManager.getCurveData(curveID) if curve_data and recompute_y: factor = curve_data.get("factor", 1) offset = curve_data.get("offset", 0) ds = curve_data["ds"] new_y = numpy.multiply(ds[1], factor) + offset if curveID in self.plotObjects: self.plotObjects[curveID].set_ydata(new_y) if curve_data and update_x: ds = curve_data["ds"] new_x = curve_data["ds"][0] if curveID in self.plotObjects: self.plotObjects[curveID].set_xdata(new_x) self.updatePlot(update_title=False) def onRemoveButtonClicked(self): curveID = self.getSelectedCurveID() if curveID in self.curveManager.curves(): curveID = self.getSelectedCurveID() if curveID in self.curveManager.curves(): if len(self.curveManager.curves()) == 1: self.curveManager.removeAllCurves(doNotClearCheckboxes=False) else: self.curveManager.removeCurve(curveID) def onCurveRemoved(self, *arg): curveID, curveData, count = arg # Remove curve from graph & plotObject dict if curveID in self.plotObjects: curve_obj = self.plotObjects[curveID] curve_obj.remove() del self.plotObjects[curveID] # Remove checkbox from corresponding tableview row = curveData["row"] file_path = curveData["file_path"] tableview = self.mda_mvc.mda_file.tabPath2Tableview(file_path) if tableview and tableview.tableView.model(): tableview.tableView.model().uncheckCheckBox(row) # Remove curve from comboBox self.removeItemCurveBox(curveID) # Update plot labels, legend and title self.updatePlot(update_title=False) # If this was the last curve for this file, remove the tab if count == 0 and self.mda_mvc.mda_file.mode() == "Auto-add": self.mda_mvc.mda_file.tabManager.removeTab(file_path) def onAllCurvesRemoved(self, doNotClearCheckboxes=True): # Clears the plot completely, removing all curve representations. self.clearPlot() for curveID in self.curveManager.curves().keys(): self.curveManager.removeCurve(curveID) if not doNotClearCheckboxes: # Iterates over each tab, accessing its associated tableview to clear all checkbox selections. for index in range(self.mda_mvc.mda_file.tabWidget.count()): tableview = self.mda_mvc.mda_file.tabIndex2Tableview(index) if tableview and tableview.tableView.model(): tableview.tableView.model().clearAllCheckboxes() tableview.tableView.model().setHighlightRow() def onDetRemoved(self, file_path, row): curveID = self.curveManager.findCurveID(file_path, row) if curveID: self.curveManager.removeCurve(curveID) def onTabRemoved(self, file_path): if self.mda_mvc.mda_file.mode() in ["Auto-add"]: for curveID in self.curveManager.curves().keys(): if self.curveManager.curves()[curveID]["file_path"] == file_path: self.curveManager.removeCurve(curveID) ########################################## UI methods:
[docs] def plot(self, row, *ds, **options): """The main method called by MDA_MVC""" self.main_axes.axis("on") self.curveManager.addCurve(row, *ds, **options)
def configPlot(self, grid=True): self.setLeftAxisText(self.ylabel()) self.setBottomAxisText(self.xlabel()) self.setPlotTitle(self.title()) if grid: self.main_axes.grid(True, color="#cccccc", linestyle="-", linewidth=0.5) else: self.main_axes.grid(False) self.canvas.draw() def updatePlot(self, update_title=True): # Collect positioner PVs from all curves and update x label: x_label_set = set() for curveID in self.curveManager.curves(): plot_options = self.curveManager.getCurveData(curveID).get("plot_options") x_label = plot_options.get("x", "") if x_label: x_label_set.add(x_label) self.setXlabel(", ".join(list(x_label_set))) # Update the y-axis label and basic math based on the selected curve curveID = self.getSelectedCurveID() if curveID in self.curveManager.curves(): self.updateBasicMathInfo(curveID) plot_options = self.curveManager.getCurveData(curveID).get("plot_options") if plot_options: self.setYlabel(plot_options.get("y", "")) # Update title: if update_title: now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.setTitle(f"Plot Date & Time: {now}") # Recompute the axes limits and autoscale: self.main_axes.relim() self.main_axes.autoscale_view() self.updateLegend() self.configPlot() self.canvas.draw() def updateLegend(self): labels = self.main_axes.get_legend_handles_labels()[1] valid_labels = [label for label in labels if not label.startswith("_")] if valid_labels: self.main_axes.legend() def clearPlot(self): self.main_axes.clear() self.main_axes.axis("off") self.main_axes.set_title("") self.clearCursors() self.clearCursorInfo() self.clearBasicMath() self.figure.canvas.draw() self.plotObjects = {} self.curveBox.clear() def hasDataItems(self): # Return whether any artists have been added to the Axes (bool) return self.main_axes.has_data() ########################################## Interaction with UI elements: def onCurveSelected(self, curveID): # Update QLineEdit & QLabel widgets with the values for the selected curve if curveID in self.plotObjects and curveID in self.curveManager.curves(): curve_data = self.curveManager.getCurveData(curveID) file_path = curve_data["file_path"] row = curve_data["row"] self.offset_value.setText(str(curve_data["offset"])) self.factor_value.setText(str(curve_data["factor"])) self.curveBox.setToolTip(file_path) try: self.mda_mvc.mda_file.highlightRowInTab(file_path, row) except Exception as exc: print(str(exc)) print("highlightRowInTab failed; ignoring exception.") else: self.offset_value.setText("0") self.factor_value.setText("1") self.curveBox.setToolTip("Selected curve") # Update basic math info: self.updateBasicMathInfo(curveID) def removeItemCurveBox(self, curveID): # Returns the index of the item containing the given text ; otherwise returns -1. i = self.curveBox.findText(curveID) if i >= 0: self.curveBox.removeItem(i) def onOffsetUpdated(self): curveID = self.getSelectedCurveID() try: offset = float(self.offset_value.text()) except ValueError: offset = 0 # Reset to default if conversion fails self.offset_value.setText(str(offset)) return self.curveManager.updateCurveOffset(curveID, offset) def onFactorUpdated(self): curveID = self.getSelectedCurveID() try: factor = float(self.factor_value.text()) except ValueError: factor = 1 # Reset to default if conversion fails or zero self.factor_value.setText(str(factor)) return self.curveManager.updateCurveFactor(curveID, factor) ########################################## Basic maths methods: def updateBasicMathInfo(self, curveID): if curveID and curveID in self.curveManager.curves(): try: curve_data = self.curveManager.getCurveData(curveID) x = curve_data["ds"][0] y = curve_data["ds"][1] stats = self.calculateBasicMath(x, y) for i, txt in zip( stats, ["min_text", "max_text", "com_text", "mean_text"] ): if isinstance(i, tuple): result = f"({utils.num2fstr(i[0])}, {utils.num2fstr(i[1])})" else: result = f"{utils.num2fstr(i)}" if i else "n/a" self.mda_mvc.findChild(QtWidgets.QLabel, txt).setText(result) except Exception as exc: print(str(exc)) self.clearBasicMath() else: self.clearBasicMath() def clearBasicMath(self): for txt in ["min_text", "max_text", "com_text", "mean_text"]: self.mda_mvc.findChild(QtWidgets.QLabel, txt).setText("n/a") def calculateBasicMath(self, x_data, y_data): x_array = numpy.array(x_data) y_array = numpy.array(y_data) # Find y_min and y_max y_min = numpy.min(y_array) y_max = numpy.max(y_array) # Find the indices of the min and max y value y_min_index = numpy.argmin(y_array) y_max_index = numpy.argmax(y_array) # Find the corresponding x values for y_min and y_max x_at_y_min = x_array[y_min_index] x_at_y_max = x_array[y_max_index] # Calculate x_com and y_mean x_com = ( numpy.sum(x_array * y_array) / numpy.sum(y_array) if numpy.sum(y_array) != 0 else None ) y_mean = numpy.mean(y_array) return (x_at_y_min, y_min), (x_at_y_max, y_max), x_com, y_mean ########################################## Cursors methods: def onRemoveCursor(self, cursor_num): cross = self.cursors.get(cursor_num) if cross: cross.remove() self.cursors[cursor_num] = None self.cursors[f"pos{cursor_num}"] = None self.cursors[f"text{cursor_num}"] = ( "middle click" if cursor_num == 1 else "right click" ) self.cursors["diff"] = "n/a" self.cursors["midpoint"] = "n/a" self.updateCursorInfo() # Recompute the axes limits and autoscale: self.main_axes.relim() self.main_axes.autoscale_view() self.canvas.draw() def clearCursors(self): self.onRemoveCursor(1) self.onRemoveCursor(2) def onclick(self, event): # Check if the click was in the main_axes if event.inaxes is self.main_axes: # Middle click for red cursor if event.button == MIDDLE_BUTTON: if self.cursors[1]: self.cursors[1].remove() # Remove existing red cursor (self.cursors[1],) = self.main_axes.plot( event.xdata, event.ydata, "r+", markersize=15, linewidth=2 ) # Update cursor position self.cursors["pos1"] = (event.xdata, event.ydata) # Right click for blue cursor elif event.button == RIGHT_BUTTON: if self.cursors[2]: self.cursors[2].remove() # Remove existing blue cursor (self.cursors[2],) = self.main_axes.plot( event.xdata, event.ydata, "b+", markersize=15, linewidth=2 ) # Update cursor position self.cursors["pos2"] = (event.xdata, event.ydata) # Update the info panel with cursor positions self.calculateCursors() # Redraw the canvas to display the new markers self.canvas.draw()
[docs] def calculateCursors(self): """ Update cursor information in info panel widget. """ # Check for the first cursor and update text accordingly if self.cursors[1]: x1, y1 = self.cursors["pos1"] self.cursors["text1"] = f"({utils.num2fstr(x1)}, {utils.num2fstr(y1)})" # Check for the second cursor and update text accordingly if self.cursors[2]: x2, y2 = self.cursors["pos2"] self.cursors["text2"] = f"({utils.num2fstr(x2)}, {utils.num2fstr(y2)})" # Calculate differences and midpoints only if both cursors are present if self.cursors[1] and self.cursors[2]: delta_x = x2 - x1 delta_y = y2 - y1 midpoint_x = (x1 + x2) / 2 midpoint_y = (y1 + y2) / 2 self.cursors["diff"] = ( f"({utils.num2fstr(delta_x)}, {utils.num2fstr(delta_y)})" ) self.cursors["midpoint"] = ( f"({utils.num2fstr(midpoint_x)}, {utils.num2fstr(midpoint_y)})" ) self.updateCursorInfo()
def updateCursorInfo(self): self.mda_mvc.mda_file_viz.pos1_text.setText(self.cursors["text1"]) self.mda_mvc.mda_file_viz.pos2_text.setText(self.cursors["text2"]) self.mda_mvc.mda_file_viz.diff_text.setText(self.cursors["diff"]) self.mda_mvc.mda_file_viz.midpoint_text.setText(self.cursors["midpoint"]) def clearCursorInfo(self): self.mda_mvc.mda_file_viz.pos1_text.setText("middle click") self.mda_mvc.mda_file_viz.pos2_text.setText("right click") self.mda_mvc.mda_file_viz.diff_text.setText("n/a") self.mda_mvc.mda_file_viz.midpoint_text.setText("n/a")
# ------ Curves management (data):
[docs] class CurveManager(QtCore.QObject): curveAdded = QtCore.pyqtSignal(str) # Emit curveID when a curve is added curveRemoved = QtCore.pyqtSignal(str, dict, int) # Emit curveID & its corresponding data when a curve is removed, plus the # number of curves left on the graph for this file curveUpdated = QtCore.pyqtSignal( str, bool, bool ) # Emit curveID, recompute_y (bool) & update_x (bool) when a curve is updated allCurvesRemoved = QtCore.pyqtSignal( bool ) # Emit a doNotClearCheckboxes bool when all curve are removed def __init__(self, parent=None): super().__init__(parent) self._curves = {} # Store curves with a unique identifier as the key
[docs] def addCurve(self, row, *ds, **options): """Add a new curve to the manager if not already present on the graph.""" # Extract info: plot_options = options.get("plot_options", {}) ds_options = options.get("ds_options", {}) label = ds_options.get("label", "unknown label") file_path = plot_options.get("filePath", "unknown path") # Generate unique label & update options: ds_options["label"] = curveID = self.generateCurveID(label, file_path) x_data = ds[0] if curveID in self._curves: # Check if x_data is the same existing_x_data = self._curves[curveID]["ds"][0] if numpy.array_equal(x_data, existing_x_data): print(" x_data is the same, do not add or update the curve") # x_data is the same, do not add or update the curve return else: print(" x_data is different, update the curve") # x_data is different, update the curve: curveData = self._curves[curveID] curveData["ds"] = ds curveData["plot_options"] = plot_options self.updateCurve(curveID, curveData, update_x=True) return # Add new curve if not already present on the graph: self._curves[curveID] = { "ds": ds, # ds = [x_data, y_data] "offset": 0, # default offset "factor": 1, # default factor "row": row, # DET checkbox row in the file tableview "file_path": file_path, "file_name": plot_options.get("fileName", ""), # without ext "plot_options": plot_options, "ds_options": ds_options, } self.curveAdded.emit(curveID)
[docs] def updateCurve(self, curveID, curveData, recompute_y=False, update_x=False): """Update an existing curve.""" if curveID in self._curves: print(f"Emits curveUpdated {curveID=}, {recompute_y=}, {update_x=}") self._curves[curveID] = curveData self.curveUpdated.emit(curveID, recompute_y, update_x)
[docs] def removeCurve(self, curveID): """Remove a curve from the manager.""" if curveID in self._curves: curveData = self._curves[curveID] file_path = curveData["file_path"] # Remove curve entry from self.curves & emit signal: del self._curves[curveID] # How many curves are left for this file: count = 0 for curve_data in self._curves.values(): if curve_data["file_path"] == file_path: count += 1 # Emit signal: self.curveRemoved.emit(curveID, curveData, count)
[docs] def removeAllCurves(self, doNotClearCheckboxes=True): """Remove all curves from the manager.""" self._curves.clear() self.allCurvesRemoved.emit(doNotClearCheckboxes)
[docs] def getCurveData(self, curveID): """Get curve data by ID.""" return self._curves.get(curveID, None)
[docs] def curves(self): """Returns a read-only view of the currently managed curves.""" return dict(self._curves)
def updateCurveOffset(self, curveID, new_offset): curve_data = self.getCurveData(curveID) if curve_data: offset = curve_data["offset"] if offset != new_offset: curve_data["offset"] = new_offset self.updateCurve(curveID, curve_data, recompute_y=True) def updateCurveFactor(self, curveID, new_factor): curve_data = self.getCurveData(curveID) if curve_data: factor = curve_data["factor"] if factor != new_factor: curve_data["factor"] = new_factor self.updateCurve(curveID, curve_data, recompute_y=True)
[docs] def generateCurveID(self, label, file_path): """ Generates a unique curve label for a given label, considering the file path. Parameters: - label (str): The original label for the curve: "file_name: PV_name (PV_unit)" or "file_name: PV_name" (if no PV_unit) - file_path (str): The file path associated with the curve. Returns: - str: A unique curve label. If the exact label already exists for different file path, a numeric suffix is appended: "file_name: PV_name (PV_unit) (1)" or "file_name: PV_name (1)" .. note:: This method allows each curve to be uniquely identified and selected, even if their base labels are identical, by considering their file paths. """ counter = 1 original_label = label # Loop through existing labels: while True: # Check if the current label exists: if label in self._curves: existing_path = self._curves[label].get("file_path") if existing_path != file_path: label = f"{original_label} ({counter})" counter += 1 else: break # If file_path is equal, then the curve is already on the graph. else: break # If the label doesn't exist already, it's automatically unique. return label
[docs] def findCurveID(self, file_path, row): """ Find the curveID based on the file path and row number. Parameters: - file_path (str): The path of the file associated with the curve. - row (int): The row number in the file tableview associated with the curve. Returns: - str: The curveID if a matching curve is found; otherwise, None. """ for curveID, curveData in self._curves.items(): if curveData["file_path"] == file_path and curveData["row"] == row: return curveID return None