Contents

Simple Messaging with Go + NATS

While waiting for a friend at a coffee shop, I had some free time and it occurred to me that we travel with asynchronous tasks. Whoever leaves first, does their work first, and arrives first. In the meantime, others are on their own journey, and eventually, we all meet at the destination. So, I decided to write a simple Go + NATS Pub/Sub to document how it’s done.

Prerequisites

Go

Since we’re writing in Go, you can install it from the official Go website: Download and install

Docker

For ease of use with NATS, let’s also install Docker.

curl https://get.docker.com | bash

NATS

Next up is NATS, which will be the intermediary for our messaging. We’ll open a channel on port 4222 by default, and also 8222 in case you want to send API requests for monitoring or other purposes. Here’s the compose.yaml file:

services:
    nats:
        image: nats
        container_name: nats-server
        command: ["--http_port", "8222"]
        networks:
            - nats-net
        ports:
            - 4222:4222
            - 8222:8222

networks:
    nats-net:
        external: false
        name: nats-net

Let’s start coding

Publisher

Let’s start with the job creator, the publisher.go. It will publish announcements on the messages topic every 5 seconds using a time Ticker until it’s told to stop.

package main

import (
    "fmt"
	"os"
	"os/signal"
	"syscall"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    // Create a channel to listen for OS signals
	closeChan := make(chan os.Signal, 1)

	// Notify the channel when SIGINT (Ctrl+C) or SIGTERM is received
	signal.Notify(closeChan, os.Interrupt, syscall.SIGTERM)

	// Create a ticker that ticks every 5 seconds
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()

    // Connect to NATS server
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatalf("Error connecting to NATS server: %v", err)
    }
    defer nc.Close()

	fmt.Println("Press Ctrl+C to exit...")

    // Publish messages every 5 seconds until terminate signal
    for {
        select {
            case <-ticker.C:
                // Publish messages every 5 seconds
                msg := fmt.Sprintf("Publishing %d", time.Now().Unix())
                err := nc.Publish("messages", []byte(msg))
                if err != nil {
                    log.Fatalf("Error publishing message: %v", err)
                }
                fmt.Printf("Published: %s\n", msg)
            case sig := <-closeChan:
                // This block executes when a termination signal is received
                fmt.Printf("Received signal: %s. Exiting...\n", sig)
                return // Exit the loop and terminate the program
		}
    }
}

Subscriber

The slave worker, subscriber.go, waits to receive jobs by subscribing to the messages topic until it’s told to stop.

package main

import (
    "fmt"
	"os"
	"os/signal"
	"syscall"
    "log"

    "github.com/nats-io/nats.go"
)

func main() {
    // Create a channel to listen for OS signals
	closeChan := make(chan os.Signal, 1)

	// Notify the channel when SIGINT (Ctrl+C) or SIGTERM is received
	signal.Notify(closeChan, os.Interrupt, syscall.SIGTERM)

    // Connect to NATS server
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatalf("Error connecting to NATS server: %v", err)
    }
    defer nc.Close()

    // Subscribe to the "messages" subject
    _, err = nc.Subscribe("messages", func(msg *nats.Msg) {
        fmt.Printf("Received message: %s\n", string(msg.Data))
    })
    if err != nil {
        log.Fatalf("Error subscribing to subject: %v", err)
    }

    // This block executes when a termination signal is received
    sig := <-closeChan
    fmt.Printf("Received signal: %s. Exiting...\n", sig)
    return // Exit the loop and terminate the program
}

Putting it all together

Start NATS

docker compose up -d

Start Publisher

go run publisher.go

Start Subscriber

go run subscriber.go

Now, just watch the console for the results every 5 seconds until you terminate the process.