Как предотвратить сбой при выполнении многопоточных вычислений, а затем построение выданных результатов

Я пишу приложение PySide2, которое отображает результаты для определенного расчета и пытается выполнить многопоточный расчет, чтобы избежать блокировки графического интерфейса. Я пытаюсь использовать QThreadPool, чтобы при взаимодействии с параметрами, относящимися к графику, запустить расчет в отдельном потоке, который возвращает результаты через сигнал в метод обратного вызова, который отображает результаты с использованием matplotlib.

Проблема в том, что когда я слишком быстро (но не слишком быстро) меняю выбор параметров, приложение вылетает. Этого не происходит, если резьба удалена.

Я знаю, что многие проблемы вызваны тем, что построение графика происходит в рабочем потоке, а не в основном потоке, поэтому я полагаю, что убедился, что построение графика происходит только в основном потоке.

Я предполагаю, что часть проблемы заключается в том, что я могу неправильно понимать, что и где работает при использовании сигналов и слотов. Я пытался найти, какой поток используется в разных точках кода, но могу использовать только QThread.currentThread(), который возвращает адрес и на самом деле не помогает, поскольку QThread.currentThreadId() приводит к этой ошибке: AttributeError: type object «PySide2.QtCore.QThread» не имеет атрибута «currentThreadId».

Я попытался изолировать такое поведение, написав минимальную версию приложения, которое вылетает аналогичным образом, большинство из которых я привел ниже. Я исключил расчет, так как не уверен, что могу им поделиться, и заменил параметры графика на QListWidget с несколькими параметрами. Для сбоя требуется больше взаимодействия, чем для правильного приложения, которое в некоторых случаях вылетает после выбора всего нескольких параметров в течение секунды или двух, но, надеюсь, иллюстрирует суть.

class MainWindow(QObject):
    def __init__(self, parent=None):
        super(MainWindow, self).__init__(parent)
        self.main_window = QMainWindow()
        self.main_window.setCentralWidget(QWidget())
        self.main_window.centralWidget().setLayout(QHBoxLayout())
        self.setup_main_window()

    def setup_main_window(self):
        print(f'setup_main_window thread address: {QThread.currentThreadId()}')
        self.load_list()
        self.plot_figure = PlotFigure()
        self.canvas = FigureCanvas(self.plot_figure)
        self.plot_figure.plot(update=False)
        self.main_window.centralWidget().layout().addWidget(self.canvas)

    def load_list(self):
        self.order_list = QListWidget(self.main_window)
        self.list_items = [
            QListWidgetItem('1', self.order_list),
            QListWidgetItem('2', self.order_list),
            QListWidgetItem('3', self.order_list),
            QListWidgetItem('4', self.order_list),
        ]
        self.order_list.itemClicked.connect(self.order_list_item_changed)
        self.main_window.centralWidget().layout().addWidget(self.order_list)

    def order_list_item_changed(self):
        print(f'order_list_item_changed thread address: {QThread.currentThreadId()}')
        self.plot_figure.plot()

    def show(self):
        if self.main_window is not None:
            self.main_window.show()

class PlotFigure(Figure):
    def __init__(self):
        super().__init__()

    def plot(self):
        print(f'plot thread address: {QThread.currentThreadId()}')
        print(f'update: {update}')
        print(f'connecting signals')
        worker = Worker(self.calc)
        #worker.signals.close.connect(self.set_end_calc)
        worker.signals.finished.connect(self.plot_result)
        print(f'threads: {QThreadPool.globalInstance().activeThreadCount()}')
        QThreadPool.globalInstance().start(worker)

    def plot_result(self, m, xs, ys):
        print(f'plot_result thread address: {QThread.currentThreadId()}')
        print('plotting')
        fig = self.canvas.figure
        fig.clear()
        self.axis = fig.add_subplot(111)
        self.image = self.axis.imshow(m,
                origin='lower', 
                aspect='auto',
                cmap=matplotlib.cm.get_cmap('inferno'), 
                interpolation='bilinear',
                extent=(xs[0], xs[-1], ys[0], ys[-1])
        )
        self.canvas.draw()
class WorkerSignals(QtCore.QObject):
    close = QtCore.Signal(bool)
    start = QtCore.Signal(bool)
    finished = QtCore.Signal(list, list, list)

class Worker(QtCore.QRunnable):
    def __init__(self, worker_method):
        super(Worker, self).__init__()
        self.signals = WorkerSignals()
        self.worker_method = worker_method

    def run(self):
        self.signals.close.emit(True)
        print('close signal sent')
        m, xs, ys = self.worker_method()
        print('calc done')
        self.signals.finished.emit(m, xs, ys)

Я должен иметь возможность выбирать новые параметры (щелкнуть виджет списка), запустить новый поток из пула потоков, который запускает расчет и отправляет результаты для построения графика. Когда за короткое время выбрано слишком много параметров, приложение аварийно завершает работу. Этого не происходит, когда все происходит в основном потоке.

Может ли кто-нибудь сказать мне, почему приложение может падать, и предложить решения, чтобы исправить сбой?


person Andrew    schedule 14.02.2019    source источник
comment
предоставьте минимальный воспроизводимый пример   -  person eyllanesc    schedule 14.02.2019
comment
Я попытался сократить проблему только до того, что я считал важным, но я понимаю, что есть еще совсем немного. Разве это недостаточно минимально, недостаточно полно или недостаточно проверяемо?   -  person Andrew    schedule 15.02.2019
comment
Минимум сам по себе является наиболее легким для достижения, поскольку он включает в себя только стирание кода, но здесь мы не требуем, чтобы он был только минимальным, а был полным, то есть вам не нужно было исправлять код, чтобы воспроизвести вашу проблему, и если он соответствует вышеизложенному. Это означает, что он воспроизводим, и цель состоит в том, чтобы у нас был небольшой код, который при выполнении без проблем   -  person eyllanesc    schedule 15.02.2019
comment
[продолжение] мы могли бы проанализировать саму проблему. Если ваш код нуждается в исправлении кода, то это требует времени, которое мы не хотим вкладывать, еще одна проблема кода исправления заключается в том, что, возможно, в части кода вы не показываете эту проблему, поэтому мы никогда не сможем решить вашу проблему.   -  person eyllanesc    schedule 15.02.2019


Ответы (1)


Описываемое вами поведение сбоя указывает на то, что несколько потоков пытаются одновременно работать с одними и теми же данными/переменными. Поскольку вы не ограничиваете количество дополнительных потоков, которые могут быть развернуты, вполне вероятно, что это произойдет, если пользователь быстро активирует несколько рабочих потоков.

Есть несколько способов справиться с этим, которые вы выбираете в зависимости от ваших требований.

Копировать данные

Если сбой происходит из-за того, что два потока обращаются к одной и той же памяти, простое решение — сделать копию данных. Обратите внимание, что для списков, словарей и т. д. вам потребуется deepcopy данные, чтобы убедиться, что вложенные значения также скопированы. Для больших данных это может вызвать некоторые накладные расходы.

Вы не включили свой метод calc в свой пример, поэтому я не могу предложить, как это сделать в вашем случае, но в целом.

from copy import copy
data = 'a simple string'
data_c = copy(data)

Или глубокая копия

from copy import deepcopy
data = {'a':'dict', 'of':'items'}
data_c = deepcopy(data)

Замки рабочие

Если у вас есть конкретный рабочий процесс, который должен запускаться только по одному, вы можете реализовать примитивную блокировку. В приведенном ниже примере предполагается, что вы хотите, чтобы последний рабочий процесс выполнялся всегда, а все более ранние рабочие процессы будут отбрасываться при поступлении новой задачи.

self._worker_lock = False
self._worker_waiting = False

Каждый рабочий сигнал .finished должен быть связан с методом, который работает.

def worker_finised(self):
    self._worker_lock = False
    if self._worker_waiting: 
        QThreadPool.globalInstance().start(self._worker_waiting)
        self._worker_waiting = False

Это автоматически запустит следующего рабочего, когда текущий завершится. Если вы хотите, чтобы все рабочие процессы выполнялись, вы можете вместо этого использовать очередь list.

Наконец, при запуске рабочего.

def plot()
    worker = Worker(self.calc)
    worker.signals.finished.connect(self.plot_result)

    if self._worker_lock:
        self._worker_waiting = worker
    else:
        self._worker_lock
        QThreadPool.globalInstance().start(worker)

Ограничьте потоки до 1

Это глупый подход, и он также не позволяет вам использовать потоки для других вычислений и делает пул потоков бесполезным.

threadpool = QThreadPool()
threadpool. setMaxThreadCount(1)

Даже в этом случае этого может быть недостаточно, если методы draw и calc все еще могут выполняться одновременно, вы все равно получите сбои. Но если ваши потоки конфликтуют только друг с другом, этого может быть достаточно.

person mfitzp    schedule 17.02.2019