A common problem when building Python GUI applications is the interface "locking up" when attempting to perform long-running background tasks. In this tutorial, we'll cover quick ways to achieve concurrent execution in PyQt6.
If you'd like to run external programs (such as command-line utilities) from your applications, check out the Using QProcess
to run external programs tutorial.
Background: The frozen GUI issue
Applications based on Qt (like most GUI applications) are based on events. This means that execution is driven in response to user interaction, signals, and timers. In an event-driven application, clicking a button creates an event that your application subsequently handles to produce some expected output. Events are pushed onto and taken off an event queue and processed sequentially.
In PyQt, we create an app with the following code:
app = QApplication([])
window = MainWindow()
app.exec()
The event loop starts when you call .exec()
on the QApplication
object and runs within the same thread as your Python code. The thread that runs this event loop — commonly referred to as the GUI thread — also handles all window communication with the host operating system.
By default, any execution triggered by the event loop will also run synchronously within this thread. In practice, this means that the time your PyQt application spends doing something, the communication with the window and the interaction with the GUI are frozen.
If what you're doing is simple, and it returns control to the GUI loop quickly, the GUI freeze will be imperceptible to the user. However, if you need to perform longer-running tasks, for example, opening and writing a large file, downloading some data, or rendering a high-resolution image, there are going to be problems.
PyQt6 Crash Course — a new tutorial in your Inbox every day
Beginner-focused crash course explaining the basics with hands-on examples.
To your user, the application will appear to be unresponsive (because it is). Because your app is no longer communicating with the OS, on macOS, if you click on your app, you will see the spinning wheel of death. And, nobody wants that.
The solution is to move your long-running tasks out of the GUI thread into another thread. PyQt provides a straightforward interface for this.
Preparation: A minimal stub app
To demonstrate multi-threaded execution, we need an application to work with. Below is a minimal stub application for PyQt that will allow us to demonstrate multithreading and see the outcome in action. Simply copy and paste this into a new file and save it with an appropriate filename, like multithread.py
. The remainder of the code will be added to this file (there is also a complete working example at the bottom if you're impatient:
import time
from PyQt6.QtCore import (
QTimer,
)
from PyQt6.QtWidgets import (
QApplication,
QLabel,
QMainWindow,
QPushButton,
QVBoxLayout,
QWidget,
)
class MainWindow(QMainWindow):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.counter = 0
layout = QVBoxLayout()
self.label = QLabel("Start")
button = QPushButton("DANGER!")
button.pressed.connect(self.oh_no)
layout.addWidget(self.label)
layout.addWidget(button)
w = QWidget()
w.setLayout(layout)
self.setCentralWidget(w)
self.show()
self.timer = QTimer()
self.timer.setInterval(1000)
self.timer.timeout.connect(self.recurring_timer)
self.timer.start()
def oh_no(self):
time.sleep(5)
def recurring_timer(self):
self.counter += 1
self.label.setText(f"Counter: {self.counter}")
app = QApplication([])
window = MainWindow()
app.exec()
Run the app as for any other Python application:
$ python multithread.py
You will see a demonstration window with a number counting upwards. This count is generated by a simple recurring timer, firing once per second. Think of this as our event loop indicator (or GUI thread indicator), a simple way to let us know that our application is ticking over normally. There is also a button with the word "DANGER!. Push it.
You'll notice that each time you push the button, the counter stops ticking, and your application freezes entirely. On Windows, you may see the window turn pale, indicating it is not responding, while on macOS, you'll get the spinning wheel of death.
The wrong approach
Avoid doing this in your code.
What appears as a frozen interface is the main Qt event loop being blocked from processing (and responding to) window events. Your clicks on the window are still registered by the host OS and sent to your application, but because it's sat in your big ol' lump of code (calling time.sleep()
), it can't accept or react to them. They have to wait until your code passes control back to Qt.
The quickest and perhaps most logical way to get around this issue is to accept events from within your code. This allows Qt to continue to respond to the host OS and your application will stay responsive. You can do this easily by using the static processEvents()
method on the QApplication
class.
For example, our long-running code time.sleep()
could be broken down into five 1-second sleeps and
insert the processEvents()
in between. The code for this would be:
def oh_no(self):
for n in range(5):
QApplication.processEvents()
time.sleep(1)
Now, when you push the DANGER! button, your app runs as before. However, now QApplication.processEvents()
intermittently passes control back to Qt, and allows it to respond to events as normal. Qt will then accept events and handle them before returning to run the remainder of your code.
This approach works, but it's horrible for a few reasons, including the following:
-
When you pass control back to Qt, your code is no longer running. This means that whatever long-running task you're trying to do will take longer. That is definitely not what you want.
-
When you have multiple long-running tasks within your application, with each calling
QApplication.processEvents()
to keep things ticking, your application's behavior can be unpredictable. -
Processing events outside the main event loop (
app.exec()
) causes your application to branch off into handling code (e.g. for triggered slots or events) while within your loop. If your code depends on or responds to an external state, then this can cause undefined behavior.
The code below demonstrates the last point in action:
import time
from PyQt6.QtCore import (
QTimer,
)
from PyQt6.QtWidgets import (
QApplication,
QLabel,
QMainWindow,
QPushButton,
QVBoxLayout,
QWidget,
)
class MainWindow(QMainWindow):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.counter = 0
layout = QVBoxLayout()
self.label = QLabel("Start")
button = QPushButton("DANGER!")
button.pressed.connect(self.oh_no)
c = QPushButton("?")
c.pressed.connect(self.change_message)
layout.addWidget(self.label)
layout.addWidget(button)
layout.addWidget(c)
w = QWidget()
w.setLayout(layout)
self.setCentralWidget(w)
self.show()
self.timer = QTimer()
self.timer.setInterval(1000)
self.timer.timeout.connect(self.recurring_timer)
self.timer.start()
def change_message(self):
self.message = "OH NO"
def oh_no(self):
self.message = "Pressed"
for n in range(100):
time.sleep(0.1)
self.label.setText(self.message)
QApplication.processEvents()
def recurring_timer(self):
self.counter += 1
self.label.setText(f"Counter: {self.counter}")
app = QApplication([])
window = MainWindow()
app.exec()
If you run this code you'll see the counter as before. Pressing DANGER! will change the displayed text to "Pressed"
, as defined at the entry point to the oh_no()
method. However, if you press the "?" button while oh_no()
is still running, you'll see that the message changes. The state is being changed from outside your event loop.
Use threads and processes
If you take a step back and think about what you want to happen in your application, then you can probably sum it up with "stuff to happen at the same time as other stuff happens".
There are two main approaches to running independent tasks within a PyQt application:
- Threads
- Processes
Threads share the same memory space, so they are quick to start up and consume minimal resources. The shared memory makes it trivial to pass data between threads. However, reading or writing memory from different threads can lead to race conditions or segfaults.
In a Python, there is the added issue that multiple threads are bound by the Global Interpreter Lock (GIL) — meaning non-GIL-releasing Python code can only execute in one thread at a time. However, this is not a major issue with PyQt, where most of the time is spent outside of Python.
Processes use separate memory space and an entirely separate Python interpreter. They sidestep any potential problems with Python's GIL but at the cost of slower start-up times, larger memory overhead, and complexity in sending and receiving data.
Processes in Qt are well suited to running and communicating with external programs. However, for simplicity's sake, threads are usually the best choice unless you have a good reason to use processes (see caveats later).
There is nothing stopping you from using pure Python threading or process-based approaches within your PyQt application. In the following sections, though, you'll rely on Qt's threading classes.
QRunnable
and the QThreadPool
Favor this approach in your code.
Qt provides a straightforward interface for running jobs or tasks in other threads, which is nicely supported in PyQt. This interface is built around two classes:
QRunnable
: The container for the work you want to perform.QThreadPool
: The method by which you pass that work to alternate threads.
The neat thing about using QThreadPool
is that it handles queuing and executing workers for you. Other than queuing up jobs and retrieving the results, there is not much to do.
To define a custom QRunnable
, you can subclass the base QRunnable
class. Then, place the code you wish you execute within the run()
method. The following is an implementation of our long-running time.sleep()
job as a QRunnable
.
Go ahead and add the following code to multithread.py
, above the MainWindow
class definition, and don't forget to import QRunnable
and pyqtSlot
from PyQt6.QtCore
:
class Worker(QRunnable):
"""Worker thread."""
@pyqtSlot()
def run(self):
"""Your long-running job goes in this method."""
print("Thread start")
time.sleep(5)
print("Thread complete")
Executing our long-running job in another thread is simply a matter of creating an instance of the Worker
and passing it to our QThreadPool
instance. It will be executed automatically.
Next, import QThreadPool
from PyQt6.QtCore
and add the following code to the __init__()
method to set up our thread pool:
self.threadpool = QThreadPool()
thread_count = self.threadpool.maxThreadCount()
print(f"Multithreading with maximum {thread_count} threads")
Finally, update the oh_no()
method as follows:
def oh_no(self):
worker = Worker()
self.threadpool.start(worker)
Now, clicking the DANGER! button will create a worker to handle the (long-running) job and spin that off into another thread via thread pool. If there are not enough threads available to process incoming workers, they'll be queued and executed in order at a later time.
Try it out, and you'll see that your application now handles you bashing the button with no problems.
Check what happens if you hit the button multiple times. You should see your threads executed immediately up to the number reported by maxThreadCount()
. If you press the button again after there are already this number of active workers, then the subsequent workers will be queued until a thread becomes available.
Improved QRunnable
If you want to pass custom data into the runner function, you can do so via __init__()
, and then have access to the data via self
from within the run()
slot:
class Worker(QRunnable):
"""Worker thread.
:param args: Arguments to make available to the run code
:param kwargs: Keywords arguments to make available to the run code
"""
def __init__(self, *args, **kwargs):
super().__init__()
self.args = args
self.kwargs = kwargs
@pyqtSlot()
def run(self):
"""Initialise the runner function with passed self.args, self.kwargs."""
print(self.args, self.kwargs)
We can take advantage of the fact that Python functions are objects and pass in the function to execute rather than subclassing QRunnable
for each runner function. In the following construction we only require a single Worker
class to handle all of our jobs:
class Worker(QRunnable):
"""Worker thread.
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread.
Supplied args and kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
"""
def __init__(self, fn, *args, **kwargs):
super().__init__()
self.fn = fn
self.args = args
self.kwargs = kwargs
@pyqtSlot()
def run(self):
"""Initialise the runner function with passed args, kwargs."""
self.fn(*self.args, **self.kwargs)
You can now pass in any Python function and have it executed in a separate thread. Go ahead and update MainWindow
with the following code:
def execute_this_fn(self):
print("Hello!")
def oh_no(self):
# Pass the function to execute
worker = Worker(
self.execute_this_fn
) # Any other args, kwargs are passed to the run function
# Execute
self.threadpool.start(worker)
Now, when you click DANGER!, the app will print Hello!
to your terminal without affecting the counter.
Thread Input/Output
Sometimes, it's helpful to be able to pass back state and data from running workers. This could include the outcome of calculations, raised exceptions, or ongoing progress (maybe for progress bars). Qt provides the signals and slots framework to allow you to do just that. Qt's signals and slots are thread-safe, allowing safe communication directly from running threads to your GUI thread.
Signals allow you to emit values, which are then picked up elsewhere in your code by slot functions that have been linked with the connect()
method.
Below is a custom WorkerSignals
class defined to contain a number of example signals. Note that custom signals can only be defined on objects derived from QObject
. Since QRunnable
is not derived from QObject
we can't define the signals there directly. A custom QObject
to hold the signals is a quick solution:
class WorkerSignals(QObject):
"""Signals from a running worker thread.
finished
No data
error
tuple (exctype, value, traceback.format_exc())
result
object data returned from processing, anything
"""
finished = pyqtSignal()
error = pyqtSignal(tuple)
result = pyqtSignal(object)
In this code, we've defined three custom signals:
finished
, which receives no data and is aimed to indicate when the task is complete.error
, which receives atuple
ofException
type,Exception
value, and formatted traceback.result
, which receives anyobject
type from the executed function.
You may not find a need for all of these signals, but they are included to give an indication of what is possible. In the following code, we're going to implement a long-running task that makes use of these signals to provide useful information to the user:
class Worker(QRunnable):
"""Worker thread.
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread.
Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
"""
def __init__(self, fn, *args, **kwargs):
super().__init__()
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
@pyqtSlot()
def run(self):
"""Initialise the runner function with passed args, kwargs."""
# Retrieve args/kwargs here; and fire processing using them
try:
result = self.fn(*self.args, **self.kwargs)
except Exception:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(result) # Return the result of the processing
finally:
self.signals.finished.emit() # Done
You can connect your own handler functions to the signals to receive notification of completion (or the result) of threads:
def execute_this_fn(self):
for n in range(0, 5):
time.sleep(1)
return "Done."
def print_output(self, s):
print(s)
def thread_complete(self):
print("THREAD COMPLETE!")
def oh_no(self):
# Pass the function to execute
worker = Worker(
self.execute_this_fn
) # Any other args, kwargs are passed to the run function
worker.signals.result.connect(self.print_output)
worker.signals.finished.connect(self.thread_complete)
# Execute
self.threadpool.start(worker)
You also often want to receive status information from long-running threads. This can be done by passing in callbacks to which your running code can send the information. You have two options here:
- Define new signals, allowing the handling to be performed using the event loop
- Use a regular Python function
In both cases, you'll need to pass these callbacks into your target function to be able to use them. The signal-based approach is used in the completed code below, where we pass a float
back as an indicator of the thread's % progress.
The complete code
A complete working example is given below, showcasing the custom QRunnable
worker together with the worker & progress signals. You should be able to easily adapt this code to any multithreaded application you develop.
import sys
import time
import traceback
from PyQt6.QtCore import (
QObject,
QRunnable,
QThreadPool,
QTimer,
pyqtSignal,
pyqtSlot,
)
from PyQt6.QtWidgets import (
QApplication,
QLabel,
QMainWindow,
QPushButton,
QVBoxLayout,
QWidget,
)
class WorkerSignals(QObject):
"""Signals from a running worker thread.
finished
No data
error
tuple (exctype, value, traceback.format_exc())
result
object data returned from processing, anything
progress
float indicating % progress
"""
finished = pyqtSignal()
error = pyqtSignal(tuple)
result = pyqtSignal(object)
progress = pyqtSignal(float)
class Worker(QRunnable):
"""Worker thread.
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread.
Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
"""
def __init__(self, fn, *args, **kwargs):
super().__init__()
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
# Add the callback to our kwargs
self.kwargs["progress_callback"] = self.signals.progress
@pyqtSlot()
def run(self):
try:
result = self.fn(*self.args, **self.kwargs)
except Exception:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(result)
finally:
self.signals.finished.emit()
class MainWindow(QMainWindow):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.counter = 0
layout = QVBoxLayout()
self.label = QLabel("Start")
button = QPushButton("DANGER!")
button.pressed.connect(self.oh_no)
layout.addWidget(self.label)
layout.addWidget(button)
w = QWidget()
w.setLayout(layout)
self.setCentralWidget(w)
self.show()
self.threadpool = QThreadPool()
thread_count = self.threadpool.maxThreadCount()
print(f"Multithreading with maximum {thread_count} threads")
self.timer = QTimer()
self.timer.setInterval(1000)
self.timer.timeout.connect(self.recurring_timer)
self.timer.start()
def progress_fn(self, n):
print(f"{n:.1f}% done")
def execute_this_fn(self, progress_callback):
for n in range(0, 5):
time.sleep(1)
progress_callback.emit(n * 100 / 4)
return "Done."
def print_output(self, s):
print(s)
def thread_complete(self):
print("THREAD COMPLETE!")
def oh_no(self):
# Pass the function to execute
worker = Worker(
self.execute_this_fn
) # Any other args, kwargs are passed to the run function
worker.signals.result.connect(self.print_output)
worker.signals.finished.connect(self.thread_complete)
worker.signals.progress.connect(self.progress_fn)
# Execute
self.threadpool.start(worker)
def recurring_timer(self):
self.counter += 1
self.label.setText(f"Counter: {self.counter}")
app = QApplication([])
window = MainWindow()
app.exec()
Caveats
You may have spotted a slight flaw in this master plan—we are still using the event loop (and the GUI thread) to process our workers' output.
This isn't a problem when we're simply tracking progress, completion, or returning metadata. However, if you have workers that return large amounts of data — e.g. loading large files, performing complex analysis and needing (large) results, or querying databases — passing this data back through the GUI thread may cause performance problems and is best avoided.
Similarly, if your application uses a large number of threads and Python result handlers, you may come up against the limitations of the GIL. As mentioned previously, when using threads execution of Python code is limited to a single thread at a time. The Python code that handles signals from your threads can be blocked by your workers and the other way around. Since blocking your slot functions blocks the event loop, this can directly impact GUI responsiveness.
In these cases, it is often better to investigate using a pure Python thread pool (e.g. concurrent futures) to keep your processing and thread-event handling further isolated from your GUI. However, note that any Python GUI code can block other Python code unless it's in a separate process.
Create GUI Applications with Python & Qt6 by Martin Fitzpatrick — (PyQt6 Edition) The hands-on guide to making apps with Python — Over 10,000 copies sold!