Asynchronous Message Queue in Golang

I have recently written an article about how to use Celery in Django projects. Celery is a message queue in Python. I though “OK, cool, now let’s take a look at how it can be implemented from scratch”.

Concept of Message Queue

The project has 3 parts:

  1. Client, which is sending tasks,
  2. Broker, which is receiving tasks and storing them for the worker,
  3. Worker, which is collecting tasks and consuming them.

In my case I am going to have 1 task, which pushes a comma-separated list of integers which the worker will sum and print the result. Both client and worker need to now about how to identify tasks (in my case it’s going to be by a key) and what data structure to expect, but there’s no need to know anything about the implementation.

Implementation of a Message Queue in Go

Let’s begin with the client, which is the simples one. When fired, it pushes a new key to the queue.

package main

func main() {
	TaskKey := "task"
	broker := New()
	broker.sendTask(TaskKey, "7,2")
}

The worker is also quite simple:

package main

import (
	"fmt"
	"strconv"
	"strings"
	"time"
)

func handleTask(value string) {
	sum := 0
	values := strings.Split(value, ",")
	for i := range values {
		v, _ := strconv.Atoi(values[i])
		sum += v
	}
	fmt.Println("Sum of ", value, " is: ", strconv.Itoa(sum))
}

func main() {
	TaskKey := "task"
	broker := New()
	for {
		time.Sleep(time.Second)
		taskValue := broker.receiveTask(TaskKey)
		if len(taskValue) > 0 {
			go handleTask(taskValue)
		}
	}
}

It runs a loop asking every second to check if a new data to process has appeared. If yes, then it fires a goroutine to calculate it. Value is split, sum is counted and the result is printed out.

Now, to the broker, which is a common piece for both parts:

package main

import (
	"github.com/go-redis/redis/v7"
)

// Broker is a helper for sending and receiving messages.
type Broker struct {
	client *redis.Client
}

// New is a constructor for Broker.
func New() Broker {
	return Broker{
		redis.NewClient(&redis.Options{
			Addr:     "localhost:6379",
			Password: "",
			DB:       0,
		}),
	}
}

func (b Broker) sendTask(key string, value string) {
	err := b.client.LPush(key, value).Err()
	if err != nil {
		panic(err)
	}
}

func (b Broker) receiveTask(key string) string {
	result, _ := b.client.RPop(key).Result()
	return result
}

It gives a function, New() which creates a Broker instance. All configuration is hardcoded, but it could be implemented to use RabbitMQ or anything else for collecting tasks. The broker puts new values from the left side of the array and collects them from the right making for a FIFO type of queue. Errors in general are ignored to provide simplicity.

Ready and Go!

Let’s build and run our code. I’ve prepared a Makefile, so just type:

$ make
go build -o client client.go broker.go
go build -o worker worker.go broker.g

Now, start the client (it’ll put data to the queue to be consumed later):

$ ./client

And let’s consume it.

$ ./worker
Sum of  7,2  is:  9

Additional resources

Another example of queue implementation in Python.

My code on Gitlab.

Last word

I have chosen Golang to learn something about it. It seemed a reasonable backup for Python, which is slow and has issues with multithreading. Today I’ve learned once again one of the greatest Python’s pros: the ease of programming. It handles stupid, simple tasks way more easily. Let’s once again take a look at the code for splitting a string and summing numbers in Go:

	sum := 0
	values := strings.Split(value, ",")
	for i := range values {
		v, _ := strconv.Atoi(values[i])
		sum += v
	}
	fmt.Println("Sum of ", value, " is: ", strconv.Itoa(sum))

Not only it still has a bug (try running it against something that has a space inside like 1, 2), but also it required 2 imports. Equivalent Python is:

print(f'Sum of {value} is: {sum(int(i) for i in value.split(","))}')

Goodnight. Oh, and don’t forget about the newsletter and Patreon.

Read also: