Skip to content

Instantly share code, notes, and snippets.

@simonw
Created February 14, 2025 06:38
Show Gist options
  • Save simonw/76c8c433f4a65cf01a5c9121453683ab to your computer and use it in GitHub Desktop.
Save simonw/76c8c433f4a65cf01a5c9121453683ab to your computer and use it in GitHub Desktop.

Detailed Analysis of Running SQLite Queries in an Asyncio Application

This analysis examines how the provided code—from a project such as Datasette or similar—is designed to run SQLite queries in an asyncio application. The design must reconcile the synchronous nature of SQLite (and its Python binding) with an asynchronous application that uses asyncio. We’ll break down the mechanics of the implementation, weigh its advantages and drawbacks, and then propose some alternative approaches.


1. How the Code Works

a. Dual-Mode Operation: Threaded vs. Non-Threaded

The Database class supports two execution “modes”:

  • Non-threaded mode: When self.ds.executor is None, the database methods use pre-established connections and run queries directly in the main thread (or current thread) while using asynchronous wrappers to keep the API consistent.
  • Threaded mode: When an executor is provided, the database read operations run on threads via asyncio.get_event_loop().run_in_executor(...) so that blocking database calls do not block the event loop.

b. Read Operations

  • Single Connection & Thread-local Reuse:
    The execute_fn method is used to run queries for reading. In non-threaded mode, it creates or reuses a “read connection” (stored as self._read_connection) and configures it (using self.ds._prepare_connection).
    In threaded mode, an inner function (using thread-local storage via threading.local()) is executed in a thread pool using run_in_executor. This ensures each thread running a query has its own connection to avoid cross-thread issues.

  • Time-limit Enforcement:
    The execute method wraps the actual query execution in the context manager sqlite_timelimit. This context manager employs SQLite’s progress handler (via conn.set_progress_handler) to interrupt long-running queries based on a configurable time limit.

c. Write Operations: Dedicated Write Thread with Queue

Since SQLite doesn’t support running multiple writes concurrently on a single connection, the code enforces serialization of writes by using a dedicated write thread:

  • Queueing Write Tasks:
    Methods like execute_write, execute_write_script, and execute_write_many call into execute_write_fn. When running in threaded mode, instead of applying the write immediately, the function is wrapped in a task (a WriteTask object) and put on a write queue (an instance of the standard queue.Queue).

  • Using Janus for Bridging Sync and Async:
    A janus.Queue is created for each write task as a “reply_queue.” Janus provides both synchronous and asynchronous interfaces to a queue, thereby allowing seamless communication between the synchronous write thread and asynchronous callers.

  • The Write Thread’s Loop:
    The dedicated write thread (started once and stored as self._write_thread) runs the _execute_writes method in an infinite loop. This loop:

    • Creates or reuses a “write connection” to the database by calling the connect(write=True) method.
    • Processes tasks one by one from the write queue.
    • For each task:
      • If isolated_connection is set, a new write connection is established just for that task, executed, and then closed.
      • Otherwise, the persistent write connection is used, and optionally a transaction is enforced using a context manager (with conn:).
    • Finally, the result or any exception is put onto the synchronous side of the janus queue (task.reply_queue.sync_q.put(result)), so the original asynchronous caller, awaiting reply_queue.async_q.get(), receives the response.

d. Summary of Mechanisms for Concurrency

  • Separation of Concerns:
    Read operations get their own thread-local connection and use a thread pool when in threaded mode.
    Write operations are consolidated through a dedicated thread using a simple queue, ensuring they do not run concurrently or conflict over the same connection.

  • Error Handling and Isolation:
    The mechanism supports “isolated” writes where a dedicated connection is opened for a particular task to avoid interference with an ongoing transaction on the persistent write connection.
    Errors encountered either in setting up the connection or during execution are packaged and returned via the queue so that the asynchronous caller can handle them.


2. Pros and Cons of This Design

Advantages

  • Safe Serialization of Writes:
    By funneling all write operations through a single dedicated thread and queue, the design avoids SQLite’s potential pitfalls when multiple threads try to write concurrently on the same connection.

  • Non-Blocking Async API:
    The use of run_in_executor for read operations and a dedicated thread for write operations means that blocking SQLite calls do not block the main asyncio event loop. This helps maintain responsiveness in the overall async application.

  • Flexible Mode Selection:
    The design supports a “non-threaded mode” (when no executor is provided) for simpler use cases, while scaling to threaded execution when higher concurrency is needed.

  • Progress Handler for Time-Limits:
    Using sqlite_timelimit with SQLite’s progress handler adds a layer of protection against runaway queries by interrupting them when they take too long.

Disadvantages

  • Increased Complexity:
    Managing two different modes (threaded vs. non-threaded), a dedicated write thread, and the bridging via janus for write requests introduces considerable complexity. The code must juggle multiple connections, queues, and error paths.

  • Resource Management Challenges:
    The write thread runs in an infinite loop and requires careful management of connections (e.g., cleaning up connections and the thread itself when shutting down). Without proper shutdown procedures, there is a risk of leaving lingering threads or open file handles.

  • Overhead of Thread Hopping:
    Although using an executor avoids blocking the event loop, every read query in threaded mode involves a context switch and potentially unnecessary overhead when many queries are lightweight.

  • Dependency on Third-Party Libraries:
    The use of the janus library (for bridging sync and async queues) and the manual management of threads add external dependencies that must be maintained and understood by future developers.

  • Potential for Suboptimal Concurrency:
    While write queries are serialized for safety, this can become a bottleneck if the application needs to handle many concurrent write requests. Also, using a separate isolated write connection for each “isolated” task could lead to frequent connection setup overhead.


3. Alternative Mechanisms That Might Work Better

Given the challenges of juggling blocking I/O with asyncio, consider these alternatives:

a. Use an Async-Aware SQLite Library (e.g., aiosqlite)

  • Description:
    Libraries like aiosqlite provide an asynchronous interface for SQLite by wrapping the synchronous SQLite API in an executor internally.
  • Benefits:
    This approach abstracts away much of the boilerplate of managing thread pools, dedicated write threads, and complex queueing logic. The API is cleaner and more “async/await friendly,” reducing the risk of subtle concurrency bugs.
  • Potential Downsides:
    While aiosqlite manages the thread pool internally, you still need to be aware of SQLite limitations (such as serialized writes). However, the library is designed for these scenarios and has already solved many of the common pitfalls.

b. Use a Connection Pooling Mechanism

  • Description:
    Instead of a dedicated write thread and thread-local connections for reads, a connection pool can be implemented where each async task obtains its own connection from the pool.
  • Benefits:
    A pool can simplify resource management and allow multiple connections to exist, which may better support a mix of read and write operations if properly configured (for example, using the :memory: URI or file-based locks).
  • Potential Downsides:
    Pool management introduces its own complexity, and ensuring that write operations remain serializable (or are safely concurrent) may still require additional locking mechanisms.

c. Offload All Database Operations to a Separate Process

  • Description:
    Another model is to run all SQLite queries in a separate process (or even microservice) and communicate with it via an IPC mechanism (such as a REST API, message queue, or RPC).
  • Benefits:
    This separates concerns entirely and prevents the blocking I/O of SQLite from ever affecting the asyncio event loop.
  • Potential Downsides:
    It introduces inter-process communication overhead and requires careful error handling and orchestration between processes. It may be overkill for many applications.

d. Build on Top of Higher-Level Abstractions

  • Description:
    Frameworks or ORMs that are designed for async operations (such as Tortoise ORM or SQLAlchemy 1.4+ with asyncio support) can handle many of these concurrency challenges internally.
  • Benefits:
    These libraries typically provide robust connection handling, pooling, and concurrency management that relieve application code from these lower-level details.
  • Potential Downsides:
    Integrating a higher-level framework might require significant refactoring if the application is already built around raw SQLite connections and custom query handling.

Conclusion

The presented code offers a solution that mediates between SQLite’s synchronous API and asyncio’s non-blocking design by:

  • Running read operations in an executor (or via thread-local connections),
  • Serializing writes through a dedicated thread and write queue with janus acting as a bridge.

While this design has notable benefits in terms of data integrity and event loop responsiveness, its complexity and potential performance bottlenecks have led to alternative approaches—such as adopting libraries like aiosqlite, connection pooling, or even offloading to separate processes—which could simplify the implementation and improve scalability.

By choosing an alternative that better matches the application’s expected load and operational context, developers can often reduce boilerplate code and reduce the risk of concurrency issues while still maintaining the benefits of asynchronous programming.

@simonw
Copy link
Author

simonw commented Feb 14, 2025

llm -c "now write code that is very likely to break the database, through concurrent writes or whatever. DO not output the classes again."
#!/usr/bin/env python3
"""
This script is designed to aggressively stress the database with concurrent writes
that update the same row. It is very likely to generate locking conflicts or even break
the database’s consistency if proper locking/retry logic isn’t in place.

WARNING:
– This script issues many concurrent UPDATE statements on the same record.
– It is intended for testing purposes only and may result in errors like "database is locked".
– Use it on a test database only!

Assumptions:
  • An AsyncDB instance (from the previously defined code) exists.
  • Do not re-include class definitions here. The AsyncDB instance is imported or defined elsewhere.
  
To run this test, ensure that AsyncDB is accessible in your PYTHONPATH.
"""

import asyncio
import random

# Create an AsyncDB instance pointing to a test database.
# For this test, we intentionally use a pool size > 1 so that multiple connections
# can attempt writes concurrently. Adjust the pool_size if desired.
db = AsyncDB("stress_break_test.db", pool_size=5)

async def conflicting_write(task_id):
    """
    This update function is deliberately very aggressive:
      - It updates the same row (id == 1) by incrementing its value.
      - A random short delay is added to increase the chance of overlapping transactions.
      - With many concurrent tasks, at least some will experience locking conflicts.
    """
    try:
        await asyncio.sleep(random.uniform(0, 0.05))
        # Attempt to update the same row concurrently.
        # Each call commits immediately.
        await db.execute("UPDATE test SET value = value + 1 WHERE id = 1", commit=True)
        print(f"Task {task_id} update successful")
    except Exception as e:
        # Expect many "database is locked" errors or similar conflicts
        print(f"Task {task_id} failed with exception: {e}")

async def main():
    # Set up the table:
    # Drop any existing test table, create a new one, and insert a single initial row.
    print("Setting up test table...")
    await db.execute("DROP TABLE IF EXISTS test", commit=True)
    await db.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value INTEGER)", commit=True)
    await db.execute("INSERT INTO test (value) VALUES (0)", commit=True)
    
    # Launch a large number of concurrent update tasks to fight over that one row.
    num_tasks = 200  # Increase this number to make conflicts even more likely.
    tasks = [asyncio.create_task(conflicting_write(i)) for i in range(num_tasks)]
    
    # Wait for all tasks to finish (errors will be printed).
    await asyncio.gather(*tasks, return_exceptions=True)
    
    # Query and print the final value in the row.
    final = await db.execute("SELECT value FROM test WHERE id = 1")
    final_value = final[0][0] if final and len(final) > 0 else None
    print("Final value in test table (expected to be close to number of successful updates):", final_value)
    
    # Clean up the AsyncDB (shut down ThreadPoolExecutor and close connections)
    db.close()

if __name__ == '__main__':
    asyncio.run(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment