Library

Detailed New Features of Firebird, Part 3: Using new clause SKIP LOCKED

(c) Denis Simonov, edited and adjusted by Alexey Kovyazin

This material is sponsored and created with the sponsorship and support of IBSurgeon, vendor of HQbird (advanced distribution of Firebird) and supplier of performance optimization, migration and technical support services for Firebird. The material is licensed under Public Documentation License

All parts:

Preface

The Firebird 5.0 DBMS has introduced many innovations, among them application developers are especially interested in the new features of the SQL language, to develop business logic faster and easier.
In this article we will introduce the one of the most useful improvements: the new SKIP LOCKED clause.

Why SKIP LOCKED was developed?

The one of the often tasks in the development of applications is to organize processing of queue "task manager - executor". For example, one or more task managers put jobs into a queue, and executors take a outstanding job from the queue and execute it, then they update the status of the job. If there is only one executor, then there are no problems. As the number of executors increases, competition for the task and conflicts between executors arise. The clause SKIP LOCKED helps developers to implement this type of queue processing in easy way without conflicts.

Preparing the Database

Let’s try to implement a task processing queue. To do this, let’s create a test database and create the QUEUE_TASK table in it. Task managers will add tasks to this table, and executors will take free tasks and complete them. The database creation script with comments is given below:

CREATE DATABASE 'inet://localhost:3055/c:\fbdata\5.0\queue.fdb'
USER SYSDBA password 'masterkey'
DEFAULT CHARACTER SET UTF8;

CREATE DOMAIN D_QUEUE_TASK_STATUS
AS SMALLINT CHECK(VALUE IN (0, 1));

COMMENT ON DOMAIN D_QUEUE_TASK_STATUS IS 'Task completion status';

CREATE TABLE QUEUE_TASK (
  ID BIGINT GENERATED BY DEFAULT AS IDENTITY NOT NULL,
  NAME VARCHAR(50) NOT NULL,
  STARTED BOOLEAN DEFAULT FALSE NOT NULL,
  WORKER_ID BIGINT,
  START_TIME TIMESTAMP,
  FINISH_TIME TIMESTAMP,
  FINISH_STATUS D_QUEUE_TASK_STATUS,
  STATUS_TEXT VARCHAR(100),
  CONSTRAINT PK_QUEUE_TASK PRIMARY KEY(ID)
);

COMMENT ON TABLE QUEUE_TASK IS 'Task queue';
COMMENT ON COLUMN QUEUE_TASK.ID IS 'Task Identifier';
COMMENT ON COLUMN QUEUE_TASK.NAME IS 'Task Name';
COMMENT ON COLUMN QUEUE_TASK.STARTED IS 'Flag that the task has been accepted for processing';
COMMENT ON COLUMN QUEUE_TASK.WORKER_ID IS 'ID of Executor';
COMMENT ON COLUMN QUEUE_TASK.START_TIME IS 'Task execution time start';
COMMENT ON COLUMN QUEUE_TASK.FINISH_TIME IS 'Task execution time finish';
COMMENT ON COLUMN QUEUE_TASK.FINISH_STATUS IS 'The status with which the task completed 0 - successfully, 1 - with error';
COMMENT ON COLUMN QUEUE_TASK.STATUS_TEXT IS 'Status text. If the task is completed without errors, then "OK", otherwise the error text';

To add a new task, execute the command

INSERT INTO QUEUE_TASK(NAME) VALUES (?)

In this example, we pass only the task name; in practice, there may be more parameters.

Each executor must select one outstanding task and set its flag to "Taken for processing".

An executor can get a free task using the following request:

SELECT ID, NAME
FROM QUEUE_TASK
WHERE STARTED IS FALSE
ORDER BY ID
FETCH FIRST ROW ONLY

Next, the executor marks the task as "Taken for processing", sets the task start time and the executor identifier. This is done with the command:

UPDATE QUEUE_TASK
SET
    STARTED = TRUE,
    WORKER_ID = ?,
    START_TIME = CURRENT_TIMESTAMP
WHERE ID = ?

After the task is accepted for processing, the actual execution of the task begins. When a task is completed, it is necessary to set the completion time of the task and its status. The task may complete with an error; in this case, the appropriate status is set and the error text is saved.

UPDATE QUEUE_TASK
SET
    FINISH_STATUS = ?,
    STATUS_TEXT = ?,
    FINISH_TIME = CURRENT_TIMESTAMP
WHERE ID = ?

Script simulating a job queue

Let’s try to test our idea. To do this, let’s write a simple script in Python.

To write a script, we will need to install two libraries:

pip install firebird-driver
pip install prettytable

Now you can start writing the script. The script is written to run under Windows, however it can also be run under Linux by changing some constants and the path to the fbclient library. Let’s save the written script to the file queue_exec.py:

#!/usr/bin/python3

import concurrent.futures as pool
import logging
import random
import time

from firebird.driver import connect, DatabaseError
from firebird.driver import driver_config
from firebird.driver import tpb, Isolation, TraAccessMode
from firebird.driver.core import TransactionManager
from prettytable import PrettyTable

driver_config.fb_client_library.value = "c:\\firebird\\5.0\\fbclient.dll"

DB_URI = 'inet://localhost:3055/d:\\fbdata\\5.0\\queue.fdb'
DB_USER = 'SYSDBA'
DB_PASSWORD = 'masterkey'
DB_CHARSET = 'UTF8'

WORKERS_COUNT = 4  # Number of Executors
WORKS_COUNT = 40   # Number of Tasks

# set up logging to console
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)

logging.basicConfig(level=logging.DEBUG,
                    handlers=[stream_handler])

class Worker:
    """Class Worker is am executor"""

    def __init__(self, worker_id: int):
        self.worker_id = worker_id

    @staticmethod
    def __next_task(tnx: TransactionManager):
        """Retrieves the next task from the queue.

        Arguments:
            tnx: The transaction in which the request is executed
        """
        cur = tnx.cursor()

        cur.execute("""
            SELECT ID, NAME
            FROM QUEUE_TASK
            WHERE STARTED IS FALSE
            ORDER BY ID
            FETCH FIRST ROW ONLY
        """)

        row = cur.fetchone()
        cur.close()
        return row

    def __on_start_task(self, tnx: TransactionManager, task_id: int) -> None:
        """Fires when task execution starts.

        Sets the flag to the task to indicate that it is running, and sets the start time of the task.

        Arguments:
            tnx: The transaction in which the request is executed
            task_id: Task ID
        """
        cur = tnx.cursor()
        cur.execute(
            """
            UPDATE QUEUE_TASK
            SET
                STARTED = TRUE,
                WORKER_ID = ?,
                START_TIME = CURRENT_TIMESTAMP
            WHERE ID = ?
            """,
            (self.worker_id, task_id,)
        )

    @staticmethod
    def __on_finish_task(tnx: TransactionManager, task_id: int, status: int, status_text: str) -> None:
        """Fires when a task completes.

        Sets the task completion time and the status with which the task completed.

        Arguments:
            tnx: The transaction in which the request is executed
            task_id: Task ID
            status: Completion status code. 0 - successful, 1 - completed with error
            status_text: Completion status text. If successful, write "OK",
                otherwise the error text.
        """
        cur = tnx.cursor()
        cur.execute(
            """
                UPDATE QUEUE_TASK
                SET
                    FINISH_STATUS = ?,
                    STATUS_TEXT = ?,
                    FINISH_TIME = CURRENT_TIMESTAMP
                WHERE ID = ?
            """,
            (status, status_text, task_id,)
        )

    def on_task_execute(self, task_id: int, name: str) -> None:
        """This method is given as an example of a function to perform some task.

        In real problems it could be different and with a different set of parameters.

        Arguments:
            task_id: Task ID
            name: Task Name
        """
        # let get random delay
        t = random.randint(1, 4)
        time.sleep(t * 0.01)
        # to demonstrate that a task can be performed with errors,
        # let's generate an exception for two of the random numbers.
        if t == 3:
            raise Exception("Some error")

    def run(self) -> int:
        """Task Execution"""
        conflict_counter = 0
        # For parallel execution, each thread must have its own connection to the database.
        with connect(DB_URI, user=DB_USER, password=DB_PASSWORD, charset=DB_CHARSET) as con:
            tnx = con.transaction_manager(tpb(Isolation.SNAPSHOT, lock_timeout=0, access_mode=TraAccessMode.WRITE))
            while True:
                # We extract the next outstanding task and give it a sign that it is being executed.
                # Since the task may be executed with an error, the task start sign
                # is set in the separate transaction.
                tnx.begin()
                try:
                    task_row = self.__next_task(tnx)
                    # If the tasks are finished, we terminate the thread
                    if task_row is None:
                        tnx.commit()
                        break
                    (task_id, name,) = task_row
                    self.__on_start_task(tnx, task_id)
                    tnx.commit()
                except DatabaseError as err:
                    if err.sqlstate == "40001":
                        conflict_counter = conflict_counter + 1
                        logging.error(f"Worker: {self.worker_id}, Task: {self.worker_id}, Error: {err}")
                    else:
                        logging.exception('')
                    tnx.rollback()
                    continue

                # Execute task
                status = 0
                status_text = "OK"
                try:
                    self.on_task_execute(task_id, name)
                except Exception as err:
                    # If an error occurs during execution,
                    # then set the appropriate status code and save the error text.
                    status = 1
                    status_text = f"{err}"
                    # logging.error(status_text)

                # We save the task completion time and record its completion status.
                tnx.begin()
                try:
                    self.__on_finish_task(tnx, task_id, status, status_text)
                    tnx.commit()
                except DatabaseError:
                    if err.sqlstate == "40001":
                        conflict_counter = conflict_counter + 1
                        logging.error(f"Worker: {self.worker_id}, Task: {self.worker_id}, Error: {err}")
                    else:
                        logging.exception('')
                    tnx.rollback()

        return conflict_counter

def main():
    print(f"Start execute script. Works: {WORKS_COUNT}, workers: {WORKERS_COUNT}\n")

    with connect(DB_URI, user=DB_USER, password=DB_PASSWORD, charset=DB_CHARSET) as con:
        # Clean previous tasks from the queue
        con.begin()
        with con.cursor() as cur:
            cur.execute("DELETE FROM QUEUE_TASK")
        con.commit()
        # Task Manager sets 40 tasks
        con.begin()
        with con.cursor() as cur:
            cur.execute(
                """
                EXECUTE BLOCK (CNT INTEGER = ?)
                AS
                DECLARE I INTEGER;
                BEGIN
                  I = 0;
                  WHILE (I < CNT) DO
                  BEGIN
                    I = I + 1;
                    INSERT INTO QUEUE_TASK(NAME)
                    VALUES ('Task ' || :I);
                  END
                END
                """,
                (WORKS_COUNT,)
            )
        con.commit()

    # Let's create executors
    workers = map(lambda worker_id: Worker(worker_id), range(WORKERS_COUNT))
    with pool.ProcessPoolExecutor(max_workers=WORKERS_COUNT) as executer:
        features = map(lambda worker: executer.submit(worker.run), workers)
        conflicts = map(lambda feature: feature.result(), pool.as_completed(features))
        conflict_count = sum(conflicts)

    # read statistics
    with connect(DB_URI, user=DB_USER, password=DB_PASSWORD, charset=DB_CHARSET) as con:
        cur = con.cursor()
        cur.execute("""
            SELECT
              COUNT(*) AS CNT_TASK,
              COUNT(*) FILTER(WHERE STARTED IS TRUE AND FINISH_TIME IS NULL) AS CNT_ACTIVE_TASK,
              COUNT(*) FILTER(WHERE FINISH_TIME IS NOT NULL) AS CNT_FINISHED_TASK,
              COUNT(*) FILTER(WHERE FINISH_STATUS = 0) AS CNT_SUCCESS,
              COUNT(*) FILTER(WHERE FINISH_STATUS = 1) AS CNT_ERROR,
              AVG(DATEDIFF(MILLISECOND FROM START_TIME TO FINISH_TIME)) AS AVG_ELAPSED_TIME,
              DATEDIFF(MILLISECOND FROM MIN(START_TIME) TO MAX(FINISH_TIME)) AS SUM_ELAPSED_TIME,
              CAST(? AS BIGINT) AS CONFLICTS
            FROM QUEUE_TASK
        """, (conflict_count,))
        row = cur.fetchone()
        cur.close()

        stat_columns = ["TASKS", "ACTIVE_TASKS", "FINISHED_TASKS", "SUCCESS", "ERROR", "AVG_ELAPSED_TIME",
                        "SUM_ELAPSED_TIME", "CONFLICTS"]

        stat_table = PrettyTable(stat_columns)
        stat_table.add_row(row)
        print("\nStatistics:")
        print(stat_table)

        cur = con.cursor()
        cur.execute("""
            SELECT
              ID,
              NAME,
              STARTED,
              WORKER_ID,
              START_TIME,
              FINISH_TIME,
              FINISH_STATUS,
              STATUS_TEXT
            FROM QUEUE_TASK
        """)
        rows = cur.fetchall()
        cur.close()

        columns = ["ID", "NAME", "STARTED", "WORKER", "START_TIME", "FINISH_TIME",
                   "STATUS", "STATUS_TEXT"]

        table = PrettyTable(columns)
        table.add_rows(rows)
        print("\nTasks:")
        print(table)

if __name__ == "__main__":
    main()

In this script, the task manager creates 40 tasks that must be completed by 4 executors. Each executor runs in its own thread. Based on the results of the script, task execution statistics are displayed, as well as the number of conflicts and the tasks themselves.

Let’s run the script:

python ./queue_exec.py
Start execute script. Works: 40, workers: 4

ERROR:root:Worker: 2, Task: 2, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95695
ERROR:root:Worker: 2, Task: 2, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95697
ERROR:root:Worker: 2, Task: 2, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95703
ERROR:root:Worker: 2, Task: 2, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95706
ERROR:root:Worker: 0, Task: 0, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95713
ERROR:root:Worker: 2, Task: 2, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95722
ERROR:root:Worker: 3, Task: 3, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95722
ERROR:root:Worker: 1, Task: 1, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95722
ERROR:root:Worker: 1, Task: 1, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95728
ERROR:root:Worker: 0, Task: 0, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95734
ERROR:root:Worker: 0, Task: 0, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95736
ERROR:root:Worker: 1, Task: 1, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95741
ERROR:root:Worker: 1, Task: 1, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95744
ERROR:root:Worker: 0, Task: 0, Error: deadlock
-update conflicts with concurrent update
-concurrent transaction number is 95749

Statistics:
+-------+--------------+----------------+---------+-------+------------------+------------------+-----------+
| TASKS | ACTIVE_TASKS | FINISHED_TASKS | SUCCESS | ERROR | AVG_ELAPSED_TIME | SUM_ELAPSED_TIME | CONFLICTS |
+-------+--------------+----------------+---------+-------+------------------+------------------+-----------+
|   40  |      0       |       40       |    28   |   12  |       43.1       |       1353       |     14    |
+-------+--------------+----------------+---------+-------+------------------+------------------+-----------+

Tasks:
+------+---------+---------+--------+--------------------------+--------------------------+--------+-------------+
|  ID  |   NAME  | STARTED | WORKER |         START_TIME       |        FINISH_TIME       | STATUS | STATUS_TEXT |
+------+---------+---------+--------+--------------------------+--------------------------+--------+-------------+
| 1341 |  Task 1 |   True  |   0    | 2023-07-06 15:35:29.9800 | 2023-07-06 15:35:30.0320 |    1   |  Some error |
| 1342 |  Task 2 |   True  |   0    | 2023-07-06 15:35:30.0420 | 2023-07-06 15:35:30.0800 |    1   |  Some error |
| 1343 |  Task 3 |   True  |   0    | 2023-07-06 15:35:30.0900 | 2023-07-06 15:35:30.1130 |    0   |      OK     |
| 1344 |  Task 4 |   True  |   0    | 2023-07-06 15:35:30.1220 | 2023-07-06 15:35:30.1450 |    0   |      OK     |
...

From the results of the script execution it is clear that 4 executors are constantly conflicting over the task. The faster the task is completed and the more performers there are, the higher the likelihood of conflicts.

Clause SKIP LOCKED

How can we change our solution so that it works efficiently and without conflicts? Here comes the new clause SKIP LOCKED from Firebird 5.0.

Clause SKIP LOCKED allows you to skip already locked entries, thereby allowing you to work without conflicts. It can be used in queries where there is a possibility of an update conflict, that is, in SELECT …​ WITH LOCK, UPDATE and DELETE queries. Let’s look at its syntax:

SELECT
  [FIRST ...]
  [SKIP ...]
  FROM 
  [WHERE ...]
  [PLAN ...]
  [ORDER BY ...]
  [{ ROWS ... } | {OFFSET ...} | {FETCH ...}]
  [FOR UPDATE [OF ...]]
  [WITH LOCK [SKIP LOCKED]]
UPDATE 
  SET ...
  [WHERE ...]
  [PLAN ...]
  [ORDER BY ...]
  [ROWS ...]
  [SKIP LOCKED]
  [RETURNING ...]
DELETE FROM 
  [WHERE ...]
  [PLAN ...]
  [ORDER BY ...]
  [ROWS ...]
  [SKIP LOCKED]
  [RETURNING ...]

Job queue without conflicts

Let’s try to fix our script so that executors do not conflict over tasks.

To do this, we need to slightly rewrite the request in the __next_task method of the Worker class.

    @staticmethod
    def __next_task(tnx: TransactionManager):
        """Retrieves the next task from the queue.

        Arguments:
            tnx: The transaction in which the request is executed
        """
        cur = tnx.cursor()

        cur.execute("""
            SELECT ID, NAME
            FROM QUEUE_TASK
            WHERE STARTED IS FALSE
            ORDER BY ID
            FETCH FIRST ROW ONLY
            FOR UPDATE WITH LOCK SKIP LOCKED
        """)

        row = cur.fetchone()
        cur.close()
        return row

Let’s run the script:

python ./queue_exec.py
Start execute script. Works: 40, workers: 4

Statistics:
+-------+--------------+----------------+---------+-------+------------------+------------------+-----------+
| TASKS | ACTIVE_TASKS | FINISHED_TASKS | SUCCESS | ERROR | AVG_ELAPSED_TIME | SUM_ELAPSED_TIME | CONFLICTS |
+-------+--------------+----------------+---------+-------+------------------+------------------+-----------+
|   40  |      0       |       40       |    32   |   8   |       39.1       |       1048       |     0     |
+-------+--------------+----------------+---------+-------+------------------+------------------+-----------+

Tasks:
+------+---------+---------+--------+--------------------------+--------------------------+--------+-------------+
|  ID  |   NAME  | STARTED | WORKER |         START_TIME       |        FINISH_TIME       | STATUS | STATUS_TEXT |
+------+---------+---------+--------+--------------------------+--------------------------+--------+-------------+
| 1381 |  Task 1 |   True  |   0    | 2023-07-06 15:57:22.0360 | 2023-07-06 15:57:22.0740 |   0    |      OK     |
| 1382 |  Task 2 |   True  |   0    | 2023-07-06 15:57:22.0840 | 2023-07-06 15:57:22.1130 |   0    |      OK     |
| 1383 |  Task 3 |   True  |   0    | 2023-07-06 15:57:22.1220 | 2023-07-06 15:57:22.1630 |   0    |      OK     |
| 1384 |  Task 4 |   True  |   0    | 2023-07-06 15:57:22.1720 | 2023-07-06 15:57:22.1910 |   0    |      OK     |
| 1385 |  Task 5 |   True  |   0    | 2023-07-06 15:57:22.2020 | 2023-07-06 15:57:22.2540 |   0    |      OK     |
| 1386 |  Task 6 |   True  |   0    | 2023-07-06 15:57:22.2620 | 2023-07-06 15:57:22.3220 |   0    |      OK     |
| 1387 |  Task 7 |   True  |   0    | 2023-07-06 15:57:22.3300 | 2023-07-06 15:57:22.3790 |   1    |  Some error |
...

This time there are no conflicts. Thus, in Firebird 5.0 you can use the SKIP LOCKED phrase to avoid unnecessary update conflicts.

Next steps

Our job queue could be improved even more. Let’s look at the query execution plan

SELECT
  ID, NAME
FROM QUEUE_TASK
WHERE STARTED IS FALSE
ORDER BY ID
FETCH FIRST ROW ONLY
FOR UPDATE WITH LOCK SKIP LOCKED
Select Expression
    -> First N Records
        -> Write Lock
            -> Filter
                -> Table "QUEUE_TASK" Access By ID
                    -> Index "PK_QUEUE_TASK" Full Scan

This execution plan is not very good. A record from the QUEUE_TASK table is retrieved using index navigation, however, it reads the whole table with the complete index scan. If the QUEUE_TASK table is not cleared as we did in our script, then over time, the selection of unprocessed tasks will become slower and slower.

You can create an index on the STARTED field. If the task manager constantly adds new tasks, and the executors perform them, then the number of unstarted tasks is always less than the number of completed ones, thus this index will effectively filter tasks. Let’s check it:

CREATE INDEX IDX_QUEUE_TASK_INACTIVE ON QUEUE_TASK(STARTED);

SELECT
  ID, NAME
FROM QUEUE_TASK
WHERE STARTED IS FALSE
ORDER BY ID
FETCH FIRST ROW ONLY
FOR UPDATE WITH LOCK SKIP LOCKED;
Select Expression
    -> First N Records
        -> Write Lock
            -> Filter
                -> Table "QUEUE_TASK" Access By ID
                    -> Index "PK_QUEUE_TASK" Full Scan
                        -> Bitmap
                            -> Index "IDX_QUEUE_TASK_INACTIVE" Range Scan (full match)

This is true, but now there are two indexes, one for filtering and one for navigation.

We can go further and create a composite index:

DROP INDEX IDX_QUEUE_TASK_INACTIVE;

CREATE INDEX IDX_QUEUE_TASK_INACTIVE ON QUEUE_TASK(STARTED, ID);
Select Expression
    -> First N Records
        -> Write Lock
            -> Filter
                -> Table "QUEUE_TASK" Access By ID
                    -> Index "IDX_QUEUE_TASK_INACTIVE" Range Scan (partial match: 1/2)

This will be more efficient since only one index is used for navigation, and it is partially scanned. However, such an index has a significant drawback: it will not be compact (and not be very fast).

To solve this problem, you can use another new feature from Firebird 5.0: partial indices.

A partial index is an index that is built on a subset of table rows defined by a conditional expression (this is called a partial index predicate). Such an index contains entries only for rows satisfying the predicate.

Let’s create partial index:

DROP INDEX IDX_QUEUE_TASK_INACTIVE;

CREATE INDEX IDX_QUEUE_TASK_INACTIVE ON QUEUE_TASK (STARTED, ID) WHERE (STARTED IS FALSE);

SELECT
  ID, NAME
FROM QUEUE_TASK
WHERE STARTED IS FALSE
ORDER BY STARTED, ID
FETCH FIRST ROW ONLY
FOR UPDATE WITH LOCK SKIP LOCKED
Select Expression
    -> First N Records
        -> Write Lock
            -> Filter
                -> Table "QUEUE_TASK" Access By ID
                    -> Index "IDX_QUEUE_TASK_INACTIVE" Full Scan

A record from the QUEUE_TASK table is retrieved by navigating the IDX_QUEUE_TASK_INACTIVE index. Despite, that the index scan is complete, the index itself is very compact, since it contains only the keys for which the condition STARTED IS FALSE is satisfied. There are always much fewer such entries in the normal task queue, than records with completed tasks.

Summary

In this material we demonstrated how to use the new SKIP LOCKED functionality that appeared in Firebird 5.0, and also have shown example of PARTIAL indices, which also appeared in Firebird 5.0.

A DDL script for creating a database, as well as a Python script with emulation of a task queue can be downloaded from the following links: