เนื้อหา

Messaging อย่างง่าย Go + NATS

ระหว่างที่รอเพื่อนร่วมทางที่ร้านกาแฟ พอมีเวลาว่างเลยนึกขึ้นได้ว่าเราเดินทางกับแบบ asynchronous task เลยนี่หว่า ใครออกมาก่อนทำก่อนถึงก่อน ในระหว่างที่คนอื่นก็เดินทางของตัวเองไป แล้วสุดท้ายมาเจอกันปลายทาง ก็เลยลองเขียน Go + NATS ทำ Pub/Sub ง่อย ๆ บันทึกไว้ว่ามันทำยังไง

ของที่ต้องมีก่อนจะเริ่ม

Go

ก็เขียนด้วย Go ละนะ Install ได้ตาม official ของ Go ได้เลยจ้า Download and install

Docker

เพื่อความง่ายในการใช้ NATS ก็ลง Docker ไว้กันด้วยนะ

curl https://get.docker.com | bash

NATS

ต่อมาที่ NATS ที่จะใช้เป็นตัวกลางของ Messaging ของเราโดยจะเปิดช่องทำงานผ่าน port 4222 by default ละก็ 8222 เปิดไว้เผื่ออยากยิง API เข้าไปเช็ค monitoring อะไรก็ว่ากันไป ด้วยไฟล์ compose.yaml

services:
    nats:
        image: nats
        container_name: nats-server
        command: ["--http_port", "8222"]
        networks:
            - nats-net
        ports:
            - 4222:4222
            - 8222:8222

networks:
    nats-net:
        external: false
        name: nats-net

เริ่ม coding

Publisher

เริ่มจากคนสร้างงานสร้างอาชีพก่อนเลยคือ publisher.go เป็นคนลงประกาศ Publish ผ่านทางหัวข้อ messages ทุก ๆ 5 วินาทีด้วย time Ticker เรื่อย ๆ จนกว่าได้โดนสั่งให้หยุด

package main

import (
    "fmt"
	"os"
	"os/signal"
	"syscall"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    // Create a channel to listen for OS signals
	closeChan := make(chan os.Signal, 1)

	// Notify the channel when SIGINT (Ctrl+C) or SIGTERM is received
	signal.Notify(closeChan, os.Interrupt, syscall.SIGTERM)

	// Create a ticker that ticks every 5 seconds
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()

    // Connect to NATS server
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatalf("Error connecting to NATS server: %v", err)
    }
    defer nc.Close()

	fmt.Println("Press Ctrl+C to exit...")

    // Publish messages every 5 seconds until terminate signal
    for {
        select {
            case <-ticker.C:
                // Publish messages every 5 seconds
                msg := fmt.Sprintf("Publishing %d", i)
                err := nc.Publish("messages", []byte(msg))
                if err != nil {
                    log.Fatalf("Error publishing message: %v", err)
                }
                fmt.Printf("Published: %s\n", msg)
            case sig := <-closeChan:
                // This block executes when a termination signal is received
                fmt.Printf("Received signal: %s. Exiting...\n", sig)
                return // Exit the loop and terminate the program
		}
    }
}

Subscriber

แรงงานทาส subscriber.go ที่รอรับงานผ่านการ subscribe หัวข้อ messages เรื่อย ๆ จนกว่าได้โดนสั่งให้หยุด

package main

import (
    "fmt"
	"os"
	"os/signal"
	"syscall"
    "log"

    "github.com/nats-io/nats.go"
)

func main() {
    // Create a channel to listen for OS signals
	closeChan := make(chan os.Signal, 1)

	// Notify the channel when SIGINT (Ctrl+C) or SIGTERM is received
	signal.Notify(closeChan, os.Interrupt, syscall.SIGTERM)

    // Connect to NATS server
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatalf("Error connecting to NATS server: %v", err)
    }
    defer nc.Close()

    // Subscribe to the "messages" subject
    _, err = nc.Subscribe("messages", func(msg *nats.Msg) {
        fmt.Printf("Received message: %s\n", string(msg.Data))
    })
    if err != nil {
        log.Fatalf("Error subscribing to subject: %v", err)
    }

    // This block executes when a termination signal is received
    sig := <-closeChan:
    fmt.Printf("Received signal: %s. Exiting...\n", sig)
    return // Exit the loop and terminate the program
}

รวมร่าง

Start NATS

docker compose up -d

Start Pubplisher

go run publisher.go

Start Subscriber

go run subscriber.go

จบรอดูผลงานผ่าน console ทุก ๆ 5 วินาทีจนกว่าจะสั่ง terminate