Saving Queue Data on to Disk

Saving data of your queues on to disk instead of in-memory in Python

In your python program you might need to save the data of your queues on to disk. One use case can be lets say you are streaming location data of a device to a server and when the network is poor or signal is weak, the device location data cannot be uploaded to server thus causing the location data to be lost. In such cases you can save the location data onto disk of your client device with status as unacknowledged and when the connection is re-established you can send the unacknowledged data back to the server.

One of the python libraries that helps to achieve this is persist-queue, a thread-safe disk based persistent queue.

persist-queue implements a file-based queue and a serial of sqlite3-based queues:

  • Disk-based: each queued item should be stored in disk in case of any crash.
  • Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.
  • Recoverable: Items can be read after process restart.
  • Green-compatible: can be used in greenlet or eventlet environment.

Here is a simple python program demonstrating the usage of persist-queue library to save the queue data on to disk.

import threading
import time
from persistqueue import SQLiteAckQueue
"""
SQLiteAckQueue status
inited = '0'
ready = '1'
unack = '2'
acked = '5'
ack_failed = '9'
"""
def func(q: SQLiteAckQueue, thread_no):
q.resume_unack_tasks()
while True:
task = q.get()
#To acknowledge the item in the queue : status 5
preId = q.ack(task)
time.sleep(2)
print(f'Thread #{thread_no} is doing task #{task} in the queue.')
state_queue = SQLiteAckQueue(
"../state", multithreading=True, auto_commit=True
)
for i in range(4):
worker = threading.Thread(target=func, args=(state_queue, i,))
worker.start()
for j in range(10):
#q.put(j)
state_queue.put(j)

Here is how the data is persisted on the SQLite Database with and without acknowledgements where
status=5 (Acknowledged when preId = q.ack(task) is enabled in the above code)
status=2 (Unacknowledged when preId = q.ack(task) is commented in the above code)

Queue data on SQLite DB