1

GitHub - rverton/pgjobs: Go and PostgreSQL job queue blueprint

 1 year ago
source link: https://github.com/rverton/pgjobs
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

pgjobs, a dead simple postgres job queueing mechanism

This project aims to be a blueprint for your own job queue solution with Go and PostgreSQL. It is recommended to fork this project and adjust the job queue with features to your own needs.

By using Postgres SKIP LOCKED feature, this allows to make a performant, non-blocking and independent queue. The technique is described here and here.

  • Performat: Non-blocking queue mechanism
  • Robust: Jobs will be 'freed' again when a worker crashes
  • Failed jobs will be retried until MAX_RETRIES, current attempt is passed as argument
  • Schedule jobs for later execution with EnqueueAt(ctx, job, "queuename", timeAt)
  • Support for multiple queues

Example usage

A complete, runnable examples can be found under ./example/.

First define a pgjobs.Job, for example in ./jobs/emailUser.go:

package jobs

import (
	"encoding/json"
	"log"

	"github.com/rverton/pgjobs"
)

type EmailUser struct {
	Email string
}

func NewEmailUser(email string) *EmailUser {
	return &EmailUser{
		Email: email,
	}
}

// the action which should be executed
func (e EmailUser) Perform(attempt int32) error {
	log.Printf("emailing %v, attempt=%v", e.Email, attempt)
	return nil
}

// this is boilerplate code and does not need to be modified
func (e EmailUser) Load(data string) (pgjobs.Job, error) {
	var n EmailUser
	err := json.Unmarshal([]byte(data), &n)
	return n, err
}

You can then setup a queue, (optionally) enforce the jobs table schema, and work on queued jobs.

package main

import (
	"context"
	"database/sql"
	"example/jobs"
	"log"
	"os"

	_ "github.com/jackc/pgx/v5/stdlib"
	"github.com/rverton/pgjobs"
)

func main() {
	db, err := sql.Open("postgres", os.Getenv("DB_URL"))
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()

	// initiate queue with postgres connection
	queue := pgjobs.NewQueue(db)
	if err := queue.SetupSchema(ctx); err != nil {
		panic(err)
	}

	job := jobs.NewEmailUser("[email protected]")
    
	// enequeue an example job for immediate execution
	if err = queue.Enqueue(context.Background(), job, "default"); err != nil {
		log.Printf("error enqueueing: %v", err)
	}
    
	// enequeue an example job for execution in 10s+
	if err = queue.EnqueueAt(context.Background(), job, "default", time.Now().Add(10*time.Second)); err != nil {
		log.Printf("error enqueueing: %v", err)
	}

	// start worker and pass all processable jobs
	queues := []string{"default"}
	if err := queue.Worker(ctx, queues, &jobs.EmailUser{}); err != nil {
		log.Println(err)
	}
}
  • Make job processing more robust by using a transaction
  • Implement attempt handling
  • Add error handling and retries?
  • Add scheduled execution
  • Remove github.com/lib/pq dependency
  • Add priority queuing

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK