goqite

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2025 License: MIT Imports: 7 Imported by: 2

README

goqite

Logo

GoDoc CI codecov

goqite (pronounced Go-queue-ite) is a persistent message queue Go library built on SQLite and inspired by AWS SQS (but much simpler).

Also supports Postgres!

go get maragu.dev/goqite

Made with ✨sparkles✨ by maragu: independent software consulting for cloud-native Go apps & AI engineering.

Contact me at markus@maragu.dk for consulting work, or perhaps an invoice to support this project?

Features

  • Messages are persisted in a single database table.
  • Messages are sent to and received from the queue, and are guaranteed to not be redelivered before a timeout occurs.
  • Support for multiple queues in one table.
  • Message timeouts can be extended, to support e.g. long-running tasks.
  • A job runner abstraction is provided on top of the queue, for your background tasks.
  • A simple HTTP handler is provided for your convenience.
  • No non-test dependencies. Bring your own SQL driver.

Examples

Queue
package main

import (
	"context"
	"database/sql"
	"fmt"
	"log/slog"
	"os"
	"time"

	_ "github.com/mattn/go-sqlite3"

	"maragu.dev/goqite"
)

func main() {
	log := slog.Default()

	// Setup the db
	db, err := sql.Open("sqlite3", ":memory:?_journal=WAL&_timeout=5000&_fk=true")
	if err != nil {
		log.Info("Error opening db", "error", err)
		return
	}
	db.SetMaxOpenConns(1)
	db.SetMaxIdleConns(1)

	// Setup the schema
	schema, err := os.ReadFile("schema_sqlite.sql")
	if err != nil {
		log.Info("Error reading schema:", "error", err)
		return
	}

	if _, err := db.Exec(string(schema)); err != nil {
		log.Info("Error executing schema:", "error", err)
		return
	}

	// Create a new queue named "jobs".
	// You can also customize the message redelivery timeout and maximum receive count,
	// but here, we use the defaults.
	q := goqite.New(goqite.NewOpts{
		DB:   db,
		Name: "jobs",
	})

	// Send a message to the queue.
	// Note that the body is an arbitrary byte slice, so you can decide
	// what kind of payload you have. You can also set a message delay.
	err = q.Send(context.Background(), goqite.Message{
		Body: []byte("yo"),
	})
	if err != nil {
		log.Info("Error sending message", "error", err)
		return
	}

	// Receive a message from the queue, during which time it's not available to
	// other consumers (until the message timeout has passed).
	m, err := q.Receive(context.Background())
	if err != nil {
		log.Info("Error receiving message", "error", err)
		return
	}

	fmt.Println(string(m.Body))

	// If you need more time for processing the message, you can extend
	// the message timeout as many times as you want.
	if err := q.Extend(context.Background(), m.ID, time.Second); err != nil {
		log.Info("Error extending message timeout", "error", err)
		return
	}

	// Make sure to delete the message, so it doesn't get redelivered.
	if err := q.Delete(context.Background(), m.ID); err != nil {
		log.Info("Error deleting message", "error", err)
		return
	}
}
Jobs
package main

import (
	"context"
	"database/sql"
	"fmt"
	"log/slog"
	"os"
	"time"

	_ "github.com/mattn/go-sqlite3"

	"maragu.dev/goqite"
	"maragu.dev/goqite/jobs"
)

func main() {
	log := slog.Default()

	// Setup the db
	db, err := sql.Open("sqlite3", ":memory:?_journal=WAL&_timeout=5000&_fk=true")
	if err != nil {
		log.Info("Error opening db", "error", err)
		return
	}
	db.SetMaxOpenConns(1)
	db.SetMaxIdleConns(1)

	// Setup the schema
	schema, err := os.ReadFile("schema_sqlite.sql")
	if err != nil {
		log.Info("Error reading schema:", "error", err)
		return
	}

	if _, err := db.Exec(string(schema)); err != nil {
		log.Info("Error executing schema:", "error", err)
		return
	}

	// Make a new queue for the jobs. You can have as many of these as you like, just name them differently.
	q := goqite.New(goqite.NewOpts{
		DB:   db,
		Name: "jobs",
	})

	// Make a job runner with a job limit of 1 and a short message poll interval.
	r := jobs.NewRunner(jobs.NewRunnerOpts{
		Limit:        1,
		Log:          log,
		PollInterval: 10 * time.Millisecond,
		Queue:        q,
	})

	// Register our "print" job.
	r.Register("print", func(ctx context.Context, m []byte) error {
		fmt.Println(string(m))
		return nil
	})

	// Create a "print" job with a message.
	if err := jobs.Create(context.Background(), q, "print", []byte("Yo")); err != nil {
		log.Info("Error creating job", "error", err)
	}

	// Stop the job runner after a timeout.
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
	defer cancel()

	// Start the job runner and see the job run.
	r.Start(ctx)
}

Using PostgreSQL

To use goqite with PostgreSQL instead of SQLite:

import _ "github.com/jackc/pgx/v5/stdlib"

// Create the queue with PostgreSQL flavor
q := goqite.New(goqite.NewOpts{
	DB:        db,  // *sql.DB connected to PostgreSQL
	Name:      "jobs",
	SQLFlavor: goqite.SQLFlavorPostgreSQL,
})

Make sure to use the PostgreSQL schema provided below when setting up your database.

Schemas

SQLite
create table goqite (
  id text primary key default ('m_' || lower(hex(randomblob(16)))),
  created text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
  updated text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
  queue text not null,
  body blob not null,
  timeout text not null default (strftime('%Y-%m-%dT%H:%M:%fZ')),
  received integer not null default 0
) strict;

create trigger goqite_updated_timestamp after update on goqite begin
  update goqite set updated = strftime('%Y-%m-%dT%H:%M:%fZ') where id = old.id;
end;

create index goqite_queue_created_idx on goqite (queue, created);
PostgreSQL
create extension if not exists pgcrypto;

create function update_timestamp()
returns trigger as $$
begin
   new.updated = now();
   return new;
end;
$$ language plpgsql;

create table goqite (
  id text primary key default ('m_' || encode(gen_random_bytes(16), 'hex')),
  created timestamptz not null default now(),
  updated timestamptz not null default now(),
  queue text not null,
  body bytea not null,
  timeout timestamptz not null default now(),
  received integer not null default 0
);

create trigger goqite_updated_timestamp
before update on goqite
for each row execute procedure update_timestamp();

create index goqite_queue_created_idx on goqite (queue, created);

Benchmarks

Just for fun, some benchmarks. 🤓

On a MacBook Pro with M3 Ultra chip and SSD, sequentially sending, receiving, and deleting a message:

$ make benchmark
go test -cpu 1,2,4,8,16 -bench=.
goos: darwin
goarch: arm64
pkg: github.com/maragudk/goqite
BenchmarkQueue/send,_receive,_delete            	   21444	     54262 ns/op
BenchmarkQueue/send,_receive,_delete-2          	   17278	     68615 ns/op
BenchmarkQueue/send,_receive,_delete-4          	   16092	     73888 ns/op
BenchmarkQueue/send,_receive,_delete-8          	   15346	     78255 ns/op
BenchmarkQueue/send,_receive,_delete-16         	   15106	     79517 ns/op

Note that the slowest result above is around 12,500 messages / second with 16 parallel producers/consumers. The fastest result is around 18,500 messages / second with just one producer/consumer. (SQLite only allows one writer at a time, so the parallelism just creates write contention.)

Documentation

Overview

Package goqite provides the named Queue. It is backed by a SQLite table where the messages are stored.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ID

type ID string

type Message

type Message struct {
	ID       ID
	Body     []byte
	Delay    time.Duration
	Priority int // Higher priority messages are received first
}

type NewOpts

type NewOpts struct {
	DB         *sql.DB
	MaxReceive int // Max receive count for messages before they cannot be received anymore.
	Name       string
	SQLFlavor  SQLFlavor
	Timeout    time.Duration // Default timeout for messages before they can be re-received.
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func New

func New(opts NewOpts) *Queue

New Queue with the given options. Defaults if not given: - Logs are discarded. - Max receive count is 3. - Timeout is five seconds.

func (*Queue) Delete

func (q *Queue) Delete(ctx context.Context, id ID) error

Delete a Message from the queue by id.

func (*Queue) DeleteTx

func (q *Queue) DeleteTx(ctx context.Context, tx *sql.Tx, id ID) error

DeleteTx is like Delete, but within an existing transaction.

func (*Queue) Extend

func (q *Queue) Extend(ctx context.Context, id ID, delay time.Duration) error

Extend a Message timeout by the given delay from now.

func (*Queue) ExtendTx

func (q *Queue) ExtendTx(ctx context.Context, tx *sql.Tx, id ID, delay time.Duration) error

ExtendTx is like Extend, but within an existing transaction.

func (*Queue) Receive

func (q *Queue) Receive(ctx context.Context) (*Message, error)

Receive a Message from the queue, or nil if there is none.

func (*Queue) ReceiveAndWait

func (q *Queue) ReceiveAndWait(ctx context.Context, interval time.Duration) (*Message, error)

ReceiveAndWait for a Message from the queue, polling at the given interval, until the context is cancelled. If the context is cancelled, the error will be non-nil. See context.Context.Err.

func (*Queue) ReceiveTx

func (q *Queue) ReceiveTx(ctx context.Context, tx *sql.Tx) (*Message, error)

ReceiveTx is like Receive, but within an existing transaction.

func (*Queue) Send

func (q *Queue) Send(ctx context.Context, m Message) error

Send a Message to the queue with an optional delay.

func (*Queue) SendAndGetID

func (q *Queue) SendAndGetID(ctx context.Context, m Message) (ID, error)

SendAndGetID is like Send, but also returns the message ID, which can be used to interact with the message without receiving it first.

func (*Queue) SendAndGetIDTx

func (q *Queue) SendAndGetIDTx(ctx context.Context, tx *sql.Tx, m Message) (ID, error)

SendAndGetIDTx is like SendAndGetID, but within an existing transaction.

func (*Queue) SendTx

func (q *Queue) SendTx(ctx context.Context, tx *sql.Tx, m Message) error

SendTx is like Send, but within an existing transaction.

type SQLFlavor added in v0.4.0

type SQLFlavor int
const (
	SQLFlavorSQLite SQLFlavor = iota
	SQLFlavorPostgreSQL
)

Directories

Path Synopsis
Package http provides an HTTP handler for a goqite.Queue.
Package http provides an HTTP handler for a goqite.Queue.
internal
sql
Package jobs provides a Runner which can run registered job [Func]s by name, when a message for it is received on the underlying queue.
Package jobs provides a Runner which can run registered job [Func]s by name, when a message for it is received on the underlying queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL