(c) Denis Simonov, edited and adjusted by Alexey Kovyazin
This material is sponsored and created with the sponsorship and support of IBSurgeon, vendor of HQbird (advanced distribution of Firebird) and supplier of performance optimization, migration and technical support services for Firebird. The material is licensed under Public Documentation License
Preface
The Firebird 5.0 DBMS has introduced many innovations, among them application developers are especially interested in the new features of the SQL language, to develop business logic faster and easier.
In this article we will introduce the one of the most useful improvements: the new SKIP LOCKED
clause.
Why SKIP LOCKED was developed?
The one of the often tasks in the development of applications is to organize processing of queue "task manager - executor". For example, one or more task managers put jobs into a queue, and executors take a outstanding job from the queue and execute it, then they update the status of the job. If there is only one executor, then there are no problems. As the number of executors increases, competition for the task and conflicts between executors arise. The clause SKIP LOCKED helps developers to implement this type of queue processing in easy way without conflicts.
Preparing the Database
Let’s try to implement a task processing queue. To do this, let’s create a test database and create the QUEUE_TASK
table in it. Task managers will add tasks to this table, and executors will take free tasks and complete them. The database creation script with comments is given below:
CREATE DATABASE 'inet://localhost:3055/c:\fbdata\5.0\queue.fdb'
USER SYSDBA password 'masterkey'
DEFAULT CHARACTER SET UTF8;
CREATE DOMAIN D_QUEUE_TASK_STATUS
AS SMALLINT CHECK(VALUE IN (0, 1));
COMMENT ON DOMAIN D_QUEUE_TASK_STATUS IS 'Task completion status';
CREATE TABLE QUEUE_TASK (
ID BIGINT GENERATED BY DEFAULT AS IDENTITY NOT NULL,
NAME VARCHAR(50) NOT NULL,
STARTED BOOLEAN DEFAULT FALSE NOT NULL,
WORKER_ID BIGINT,
START_TIME TIMESTAMP,
FINISH_TIME TIMESTAMP,
FINISH_STATUS D_QUEUE_TASK_STATUS,
STATUS_TEXT VARCHAR(100),
CONSTRAINT PK_QUEUE_TASK PRIMARY KEY(ID)
);
COMMENT ON TABLE QUEUE_TASK IS 'Task queue';
COMMENT ON COLUMN QUEUE_TASK.ID IS 'Task Identifier';
COMMENT ON COLUMN QUEUE_TASK.NAME IS 'Task Name';
COMMENT ON COLUMN QUEUE_TASK.STARTED IS 'Flag that the task has been accepted for processing';
COMMENT ON COLUMN QUEUE_TASK.WORKER_ID IS 'ID of Executor';
COMMENT ON COLUMN QUEUE_TASK.START_TIME IS 'Task execution time start';
COMMENT ON COLUMN QUEUE_TASK.FINISH_TIME IS 'Task execution time finish';
COMMENT ON COLUMN QUEUE_TASK.FINISH_STATUS IS 'The status with which the task completed 0 - successfully, 1 - with error';
COMMENT ON COLUMN QUEUE_TASK.STATUS_TEXT IS 'Status text. If the task is completed without errors, then "OK", otherwise the error text';
To add a new task, execute the command
INSERT INTO QUEUE_TASK(NAME) VALUES (?)
In this example, we pass only the task name; in practice, there may be more parameters.
Each executor must select one outstanding task and set its flag to "Taken for processing".
An executor can get a free task using the following request:
SELECT ID, NAME
FROM QUEUE_TASK
WHERE STARTED IS FALSE
ORDER BY ID
FETCH FIRST ROW ONLY
Next, the executor marks the task as "Taken for processing", sets the task start time and the executor identifier. This is done with the command:
UPDATE QUEUE_TASK
SET
STARTED = TRUE,
WORKER_ID = ?,
START_TIME = CURRENT_TIMESTAMP
WHERE ID = ?
After the task is accepted for processing, the actual execution of the task begins. When a task is completed, it is necessary to set the completion time of the task and its status. The task may complete with an error; in this case, the appropriate status is set and the error text is saved.
UPDATE QUEUE_TASK
SET
FINISH_STATUS = ?,
STATUS_TEXT = ?,
FINISH_TIME = CURRENT_TIMESTAMP
WHERE ID = ?
Script simulating a job queue
Let’s try to test our idea. To do this, let’s write a simple script in Python.
To write a script, we will need to install two libraries:
pip install firebird-driver
pip install prettytable
Now you can start writing the script. The script is written to run under Windows, however it can also be run under Linux by changing some constants and the path to the fbclient
library. Let’s save the written script to the file queue_exec.py
:
#!/usr/bin/python3
import concurrent.futures as pool
import logging
import random
import time
from firebird.driver import connect, DatabaseError
from firebird.driver import driver_config
from firebird.driver import tpb, Isolation, TraAccessMode
from firebird.driver.core import TransactionManager
from prettytable import PrettyTable
driver_config.fb_client_library.value = "c:\\firebird\\5.0\\fbclient.dll"
DB_URI = 'inet://localhost:3055/d:\\fbdata\\5.0\\queue.fdb'
DB_USER = 'SYSDBA'
DB_PASSWORD = 'masterkey'
DB_CHARSET = 'UTF8'
WORKERS_COUNT = 4 # Number of Executors
WORKS_COUNT = 40 # Number of Tasks
# set up logging to console
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
logging.basicConfig(level=logging.DEBUG,
handlers=[stream_handler])
class Worker:
"""Class Worker is am executor"""
def __init__(self, worker_id: int):
self.worker_id = worker_id
@staticmethod
def __next_task(tnx: TransactionManager):
"""Retrieves the next task from the queue.
Arguments:
tnx: The transaction in which the request is executed
"""
cur = tnx.cursor()
cur.execute("""
SELECT ID, NAME
FROM QUEUE_TASK
WHERE STARTED IS FALSE
ORDER BY ID
FETCH FIRST ROW ONLY
""")
row = cur.fetchone()
cur.close()
return row
def __on_start_task(self, tnx: TransactionManager, task_id: int) -> None:
"""Fires when task execution starts.
Sets the flag to the task to indicate that it is running, and sets the start time of the task.
Arguments:
tnx: The transaction in which the request is executed
task_id: Task ID
"""
cur = tnx.cursor()
cur.execute(
"""
UPDATE QUEUE_TASK
SET
STARTED = TRUE,
WORKER_ID = ?,
START_TIME = CURRENT_TIMESTAMP
WHERE ID = ?
""",
(self.worker_id, task_id,)
)
@staticmethod
def __on_finish_task(tnx: TransactionManager, task_id: int, status: int, status_text: str) -> None:
"""Fires when a task completes.
Sets the task completion time and the status with which the task completed.
Arguments:
tnx: The transaction in which the request is executed
task_id: Task ID
status: Completion status code. 0 - successful, 1 - completed with error
status_text: Completion status text. If successful, write "OK",
otherwise the error text.
"""
cur = tnx.cursor()
cur.execute(
"""
UPDATE QUEUE_TASK
SET
FINISH_STATUS = ?,
STATUS_TEXT = ?,
FINISH_TIME = CURRENT_TIMESTAMP
WHERE ID = ?
""",
(status, status_text, task_id,)
)
def on_task_execute(self, task_id: int, name: str) -> None:
"""This method is given as an example of a function to perform some task.
In real problems it could be different and with a different set of parameters.
Arguments:
task_id: Task ID
name: Task Name
"""
# let get random delay
t = random.randint(1, 4)
time.sleep(t * 0.01)
# to demonstrate that a task can be performed with errors,
# let's generate an exception for two of the random numbers.
if t == 3:
raise Exception("Some error")
def run(self) -> int:
"""Task Execution"""
conflict_counter = 0
# For parallel execution, each thread must have its own connection to the database.
with connect(DB_URI, user=DB_USER, password=DB_PASSWORD, charset=DB_CHARSET) as con:
tnx = con.transaction_manager(tpb(Isolation.SNAPSHOT, lock_timeout=0, access_mode=TraAccessMode.WRITE))
while True:
# We extract the next outstanding task and give it a sign that it is being executed.
# Since the task may be executed with an error, the task start sign
# is set in the separate transaction.
tnx.begin()
try:
task_row = self.__next_task(tnx)
# If the tasks are finished, we terminate the thread
if task_row is None:
tnx.commit()
break
(task_id, name,) = task_row
self.__on_start_task(tnx, task_id)
tnx.commit()
except DatabaseError as err:
if err.sqlstate == "40001":
conflict_counter = conflict_counter + 1
logging.error(f"Worker: {self.worker_id}, Task: {self.worker_id}, Error: {err}")
else:
logging.exception('')
tnx.rollback()
continue
# Execute task
status = 0
status_text = "OK"
try:
self.on_task_execute(task_id, name)
except Exception as err:
# If an error occurs during execution,
# then set the appropriate status code and save the error text.
status = 1
status_text = f"{err}"
# logging.error(status_text)
# We save the task completion time and record its completion status.
tnx.begin()
try:
self.__on_finish_task(tnx, task_id, status, status_text)
tnx.commit()
except DatabaseError:
if err.sqlstate == "40001":
conflict_counter = conflict_counter + 1
logging.error(f"Worker: {self.worker_id}, Task: {self.worker_id}, Error: {err}")
else:
logging.exception('')
tnx.rollback()
return conflict_counter
def main():
print(f"Start execute script. Works: {WORKS_COUNT}, workers: {WORKERS_COUNT}\n")
with connect(DB_URI, user=DB_USER, password=DB_PASSWORD, charset=DB_CHARSET) as con:
# Clean previous tasks from the queue
con.begin()
with con.cursor() as cur:
cur.execute("DELETE FROM QUEUE_TASK")
con.commit()
# Task Manager sets 40 tasks
con.begin()
with con.cursor() as cur:
cur.execute(
"""
EXECUTE BLOCK (CNT INTEGER = ?)
AS
DECLARE I INTEGER;
BEGIN
I = 0;
WHILE (I < CNT) DO
BEGIN
I = I + 1;
INSERT INTO QUEUE_TASK(NAME)
VALUES ('Task ' || :I);
END
END
""",
(WORKS_COUNT,)
)
con.commit()
# Let's create executors
workers = map(lambda worker_id: Worker(worker_id), range(WORKERS_COUNT))
with pool.ProcessPoolExecutor(max_workers=WORKERS_COUNT) as executer:
features = map(lambda worker: executer.submit(worker.run), workers)
conflicts = map(lambda feature: feature.result(), pool.as_completed(features))
conflict_count = sum(conflicts)
# read statistics
with connect(DB_URI, user=DB_USER, password=DB_PASSWORD, charset=DB_CHARSET) as con:
cur = con.cursor()
cur.execute("""
SELECT
COUNT(*) AS CNT_TASK,
COUNT(*) FILTER(WHERE STARTED IS TRUE AND FINISH_TIME IS NULL) AS CNT_ACTIVE_TASK,
COUNT(*) FILTER(WHERE FINISH_TIME IS NOT NULL) AS CNT_FINISHED_TASK,
COUNT(*) FILTER(WHERE FINISH_STATUS = 0) AS CNT_SUCCESS,
COUNT(*) FILTER(WHERE FINISH_STATUS = 1) AS CNT_ERROR,
AVG(DATEDIFF(MILLISECOND FROM START_TIME TO FINISH_TIME)) AS AVG_ELAPSED_TIME,
DATEDIFF(MILLISECOND FROM MIN(START_TIME) TO MAX(FINISH_TIME)) AS SUM_ELAPSED_TIME,
CAST(? AS BIGINT) AS CONFLICTS
FROM QUEUE_TASK
""", (conflict_count,))
row = cur.fetchone()
cur.close()
stat_columns = ["TASKS", "ACTIVE_TASKS", "FINISHED_TASKS", "SUCCESS", "ERROR", "AVG_ELAPSED_TIME",
"SUM_ELAPSED_TIME", "CONFLICTS"]
stat_table = PrettyTable(stat_columns)
stat_table.add_row(row)
print("\nStatistics:")
print(stat_table)
cur = con.cursor()
cur.execute("""
SELECT
ID,
NAME,
STARTED,
WORKER_ID,
START_TIME,
FINISH_TIME,
FINISH_STATUS,
STATUS_TEXT
FROM QUEUE_TASK
""")
rows = cur.fetchall()
cur.close()
columns = ["ID", "NAME", "STARTED", "WORKER", "START_TIME", "FINISH_TIME",
"STATUS", "STATUS_TEXT"]
table = PrettyTable(columns)
table.add_rows(rows)
print("\nTasks:")
print(table)
if __name__ == "__main__":
main()
In this script, the task manager creates 40 tasks that must be completed by 4 executors. Each executor runs in its own thread. Based on the results of the script, task execution statistics are displayed, as well as the number of conflicts and the tasks themselves.
Let’s run the script:
python ./queue_exec.py
Start execute script. Works: 40, workers: 4 ERROR:root:Worker: 2, Task: 2, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95695 ERROR:root:Worker: 2, Task: 2, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95697 ERROR:root:Worker: 2, Task: 2, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95703 ERROR:root:Worker: 2, Task: 2, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95706 ERROR:root:Worker: 0, Task: 0, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95713 ERROR:root:Worker: 2, Task: 2, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95722 ERROR:root:Worker: 3, Task: 3, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95722 ERROR:root:Worker: 1, Task: 1, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95722 ERROR:root:Worker: 1, Task: 1, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95728 ERROR:root:Worker: 0, Task: 0, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95734 ERROR:root:Worker: 0, Task: 0, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95736 ERROR:root:Worker: 1, Task: 1, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95741 ERROR:root:Worker: 1, Task: 1, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95744 ERROR:root:Worker: 0, Task: 0, Error: deadlock -update conflicts with concurrent update -concurrent transaction number is 95749 Statistics: +-------+--------------+----------------+---------+-------+------------------+------------------+-----------+ | TASKS | ACTIVE_TASKS | FINISHED_TASKS | SUCCESS | ERROR | AVG_ELAPSED_TIME | SUM_ELAPSED_TIME | CONFLICTS | +-------+--------------+----------------+---------+-------+------------------+------------------+-----------+ | 40 | 0 | 40 | 28 | 12 | 43.1 | 1353 | 14 | +-------+--------------+----------------+---------+-------+------------------+------------------+-----------+ Tasks: +------+---------+---------+--------+--------------------------+--------------------------+--------+-------------+ | ID | NAME | STARTED | WORKER | START_TIME | FINISH_TIME | STATUS | STATUS_TEXT | +------+---------+---------+--------+--------------------------+--------------------------+--------+-------------+ | 1341 | Task 1 | True | 0 | 2023-07-06 15:35:29.9800 | 2023-07-06 15:35:30.0320 | 1 | Some error | | 1342 | Task 2 | True | 0 | 2023-07-06 15:35:30.0420 | 2023-07-06 15:35:30.0800 | 1 | Some error | | 1343 | Task 3 | True | 0 | 2023-07-06 15:35:30.0900 | 2023-07-06 15:35:30.1130 | 0 | OK | | 1344 | Task 4 | True | 0 | 2023-07-06 15:35:30.1220 | 2023-07-06 15:35:30.1450 | 0 | OK | ...
From the results of the script execution it is clear that 4 executors are constantly conflicting over the task. The faster the task is completed and the more performers there are, the higher the likelihood of conflicts.
Clause SKIP LOCKED
How can we change our solution so that it works efficiently and without conflicts? Here comes the new clause SKIP LOCKED
from Firebird 5.0.
Clause SKIP LOCKED
allows you to skip already locked entries, thereby allowing you to work without conflicts. It can be used in queries where there is a possibility of an update conflict, that is, in SELECT … WITH LOCK
, UPDATE
and DELETE
queries. Let’s look at its syntax:
SELECT [FIRST ...] [SKIP ...] FROM <sometable> [WHERE ...] [PLAN ...] [ORDER BY ...] [{ ROWS ... } | {OFFSET ...} | {FETCH ...}] [FOR UPDATE [OF ...]] [WITH LOCK [SKIP LOCKED]]
UPDATE <sometable> SET ... [WHERE ...] [PLAN ...] [ORDER BY ...] [ROWS ...] [SKIP LOCKED] [RETURNING ...]
DELETE FROM <sometable> [WHERE ...] [PLAN ...] [ORDER BY ...] [ROWS ...] [SKIP LOCKED] [RETURNING ...]
Job queue without conflicts
Let’s try to fix our script so that executors do not conflict over tasks.
To do this, we need to slightly rewrite the request in the __next_task
method of the Worker
class.
@staticmethod
def __next_task(tnx: TransactionManager):
"""Retrieves the next task from the queue.
Arguments:
tnx: The transaction in which the request is executed
"""
cur = tnx.cursor()
cur.execute("""
SELECT ID, NAME
FROM QUEUE_TASK
WHERE STARTED IS FALSE
ORDER BY ID
FETCH FIRST ROW ONLY
FOR UPDATE WITH LOCK SKIP LOCKED
""")
row = cur.fetchone()
cur.close()
return row
Let’s run the script:
python ./queue_exec.py
Start execute script. Works: 40, workers: 4 Statistics: +-------+--------------+----------------+---------+-------+------------------+------------------+-----------+ | TASKS | ACTIVE_TASKS | FINISHED_TASKS | SUCCESS | ERROR | AVG_ELAPSED_TIME | SUM_ELAPSED_TIME | CONFLICTS | +-------+--------------+----------------+---------+-------+------------------+------------------+-----------+ | 40 | 0 | 40 | 32 | 8 | 39.1 | 1048 | 0 | +-------+--------------+----------------+---------+-------+------------------+------------------+-----------+ Tasks: +------+---------+---------+--------+--------------------------+--------------------------+--------+-------------+ | ID | NAME | STARTED | WORKER | START_TIME | FINISH_TIME | STATUS | STATUS_TEXT | +------+---------+---------+--------+--------------------------+--------------------------+--------+-------------+ | 1381 | Task 1 | True | 0 | 2023-07-06 15:57:22.0360 | 2023-07-06 15:57:22.0740 | 0 | OK | | 1382 | Task 2 | True | 0 | 2023-07-06 15:57:22.0840 | 2023-07-06 15:57:22.1130 | 0 | OK | | 1383 | Task 3 | True | 0 | 2023-07-06 15:57:22.1220 | 2023-07-06 15:57:22.1630 | 0 | OK | | 1384 | Task 4 | True | 0 | 2023-07-06 15:57:22.1720 | 2023-07-06 15:57:22.1910 | 0 | OK | | 1385 | Task 5 | True | 0 | 2023-07-06 15:57:22.2020 | 2023-07-06 15:57:22.2540 | 0 | OK | | 1386 | Task 6 | True | 0 | 2023-07-06 15:57:22.2620 | 2023-07-06 15:57:22.3220 | 0 | OK | | 1387 | Task 7 | True | 0 | 2023-07-06 15:57:22.3300 | 2023-07-06 15:57:22.3790 | 1 | Some error | ...
This time there are no conflicts. Thus, in Firebird 5.0 you can use the SKIP LOCKED
phrase to avoid unnecessary update conflicts.
Next steps
Our job queue could be improved even more. Let’s look at the query execution plan
SELECT
ID, NAME
FROM QUEUE_TASK
WHERE STARTED IS FALSE
ORDER BY ID
FETCH FIRST ROW ONLY
FOR UPDATE WITH LOCK SKIP LOCKED
Select Expression -> First N Records -> Write Lock -> Filter -> Table "QUEUE_TASK" Access By ID -> Index "PK_QUEUE_TASK" Full Scan
This execution plan is not very good. A record from the QUEUE_TASK
table is retrieved using index navigation, however, it reads the whole table with the complete index scan. If the QUEUE_TASK
table is not cleared as we did in our script, then over time, the selection of unprocessed tasks will become slower and slower.
You can create an index on the STARTED
field. If the task manager constantly adds new tasks, and the executors perform them, then the number of unstarted tasks is always less than the number of completed ones, thus this index will effectively filter tasks. Let’s check it:
CREATE INDEX IDX_QUEUE_TASK_INACTIVE ON QUEUE_TASK(STARTED);
SELECT
ID, NAME
FROM QUEUE_TASK
WHERE STARTED IS FALSE
ORDER BY ID
FETCH FIRST ROW ONLY
FOR UPDATE WITH LOCK SKIP LOCKED;
Select Expression -> First N Records -> Write Lock -> Filter -> Table "QUEUE_TASK" Access By ID -> Index "PK_QUEUE_TASK" Full Scan -> Bitmap -> Index "IDX_QUEUE_TASK_INACTIVE" Range Scan (full match)
This is true, but now there are two indexes, one for filtering and one for navigation.
We can go further and create a composite index:
DROP INDEX IDX_QUEUE_TASK_INACTIVE;
CREATE INDEX IDX_QUEUE_TASK_INACTIVE ON QUEUE_TASK(STARTED, ID);
Select Expression -> First N Records -> Write Lock -> Filter -> Table "QUEUE_TASK" Access By ID -> Index "IDX_QUEUE_TASK_INACTIVE" Range Scan (partial match: 1/2)
This will be more efficient since only one index is used for navigation, and it is partially scanned. However, such an index has a significant drawback: it will not be compact (and not be very fast).
To solve this problem, you can use another new feature from Firebird 5.0: partial indices.
A partial index is an index that is built on a subset of table rows defined by a conditional expression (this is called a partial index predicate). Such an index contains entries only for rows satisfying the predicate.
Let’s create partial index:
DROP INDEX IDX_QUEUE_TASK_INACTIVE;
CREATE INDEX IDX_QUEUE_TASK_INACTIVE ON QUEUE_TASK (STARTED, ID) WHERE (STARTED IS FALSE);
SELECT
ID, NAME
FROM QUEUE_TASK
WHERE STARTED IS FALSE
ORDER BY STARTED, ID
FETCH FIRST ROW ONLY
FOR UPDATE WITH LOCK SKIP LOCKED
Select Expression -> First N Records -> Write Lock -> Filter -> Table "QUEUE_TASK" Access By ID -> Index "IDX_QUEUE_TASK_INACTIVE" Full Scan
A record from the QUEUE_TASK
table is retrieved by navigating the IDX_QUEUE_TASK_INACTIVE
index. Despite, that the index scan is complete, the index itself is very compact, since it contains only the keys for which the condition STARTED IS FALSE
is satisfied. There are always much fewer such entries in the normal task queue, than records with completed tasks.
Summary
In this material we demonstrated how to use the new SKIP LOCKED
functionality that appeared in Firebird 5.0, and also have shown example of PARTIAL indices, which also appeared in Firebird 5.0.
A DDL script for creating a database, as well as a Python script with emulation of a task queue can be downloaded from the following links: