Memberlist - Gossip Protocol in Go

This post is to show a simple, “hello world” level of gossip protocol communication using hashicorp/memberlist, a Go library that manages cluster membership and member failure detection using a gossip based protocol.

We’re going to build a proof-of-concept for this library, with a simple service that get and set a variable whose value is shared across several instances of the services.

Gossip protocol is a peer-to-peer communication protocol where nodes in a cluster can broadcast messages to all other nodes, without the need of maintaining a list of the cluster members. The final code is available here

The basics of hashicorp/memberlist

Messages

Our nodes will send a receive messages; a message must implement the Broadcast interface. This is a simple implementation to broadcast a string:

package main

import "github.com/hashicorp/memberlist"

type Message struct {
	Value string
}

// Message Returns a byte form of the message
func (m *Message) Message() []byte {
	return []byte(m.Value)
}

// Invalidates checks if enqueuing the current broadcast
// invalidates a previous broadcast
func (m *Message) Invalidates(b memberlist.Broadcast) bool {
	return false
}

// Finished is invoked when the message will no longer
// be broadcast, either due to invalidation or to the
// transmit limit being reached
func (m *Message) Finished() {

}

Delegates

Delegate and Events are delegates for receiving and providing data to memberlist via callback mechanisms. You must provide an implementation of the Delegate to hook into the gossip layer of Memberlist. All the methods must be thread-safe, as they can and generally will be called concurrently.

The EventDelegate is an optional interface, that defines the callback to get notified by the events in the cluster, as when a node join or leave the pool.

Receiving a broadcast messages

To receive a broadcasted message, you must implement the Delegate, and provide the implementation to the library (see #wiring all together). To send/broadcast a message, we need an instance of memberlist.TransmitLimitedQueue(more in the #wiring all together)

package main

import "github.com/hashicorp/memberlist"

// SimpleDelegate handles incoming messages, see NotifyMsg
type SimpleDelegate struct {
	// SharedVariableValue keeps avalue shared accross all members of the cluster
	SharedVariableValue atomic.Value
	
	// Broadcasts is the object we use to broadcast a message
	Broadcasts          *memberlist.TransmitLimitedQueue
}

func (delegate *SimpleDelegate) NotifyMsg(message []byte) {
	value := string(message)
	stdout.Printf("GOT MESSAGE: %s", value)
	delegate.SharedVariableValue.Store(value)
}

func (delegate *SimpleDelegate) NodeMeta(limit int) []byte {
	return []byte("")
}


func (delegate *SimpleDelegate) GetBroadcasts(overhead, limit int) [][]byte {
	return delegate.Broadcasts.GetBroadcasts(overhead, limit)
}

func (delegate *SimpleDelegate) LocalState(join bool) []byte {
	// see https://github.com/hashicorp/memberlist/issues/184
	return []byte("")
}

func (delegate *SimpleDelegate) MergeRemoteState(buf []byte, join bool) {
	stdout.Printf("MergeRemoteState?")
}

The full example

This main.go showcase how to intercat with the library. The SimpleDelegate handles track the state of the cluster: SharedVariableValue is a variable whose value is shared accross all cluster members, Broadcasts is the service used to broadcast a message, and func NotifyMsg handles incoming messages.

package main

import (
	"fmt"
	"github.com/hashicorp/memberlist"
	"log"
	"net/http"
	"os"
	"sync/atomic"
	"time"
)

var stdout = log.New(os.Stdout, "", 0)
var delegate SimpleDelegate

// region: Step1: Implement thebasic interfaces

// SimpleDelegate handles incoming messages, see NotifyMsg
type SimpleDelegate struct {
	// SharedVariableValue keeps avalue shared accross all members of the cluster
	SharedVariableValue atomic.Value

	// Broadcasts is the object we use to broadcast a message
	Broadcasts *memberlist.TransmitLimitedQueue
}

func (delegate *SimpleDelegate) NotifyMsg(message []byte) {
	value := string(message)
	stdout.Printf("GOT MESSAGE: %s", value)
	delegate.SharedVariableValue.Store(value)
}

func (delegate *SimpleDelegate) NodeMeta(limit int) []byte {
	return []byte("")
}

func (delegate *SimpleDelegate) GetBroadcasts(overhead, limit int) [][]byte {
	return delegate.Broadcasts.GetBroadcasts(overhead, limit)
}

func (delegate *SimpleDelegate) LocalState(join bool) []byte {
	// see https://github.com/hashicorp/memberlist/issues/184
	return []byte("")
}

func (delegate *SimpleDelegate) MergeRemoteState(buf []byte, join bool) {
	stdout.Printf("MergeRemoteState?")
}

// Message implements the memberlist.Broadcast interface, to serialize and enqueue a message
type Message struct {
	Value string
}

// Message Returns a byte form of the message
func (m *Message) Message() []byte {
	return []byte(m.Value)
}

// Invalidates checks if enqueuing the current broadcast
// invalidates a previous broadcast
func (m *Message) Invalidates(b memberlist.Broadcast) bool {
	return false
}

// Finished is invoked when the message will no longer
// be broadcast, either due to invalidation or to the
// transmit limit being reached
func (m *Message) Finished() {

}

// endregion

func main() {

	// this is the list of members in the cluster
	var list *memberlist.Memberlist

	// Setup the broadcast queue
	brodcastQueue := new(memberlist.TransmitLimitedQueue)
	// This numeber is the multiplier for each retransmission: if you send a mesage to a node, this node will send themessage to 5 other node.
	// Higher the number, higher the TCP traffic, lower the time to converge to a common state
	brodcastQueue.RetransmitMult = 5
	brodcastQueue.NumNodes = func() int { return len(list.Members()) }

	// Configure the delegate
	delegate = SimpleDelegate{
		SharedVariableValue: atomic.Value{},
		Broadcasts:          brodcastQueue,
	}

	// Setup the library
	// DefaultLANConfig is a sane fedault configuration for a nodes in a LAN
	config := memberlist.DefaultLANConfig()
	config.Delegate = &delegate

	list, err := memberlist.Create(config)
	PanicIfErr(err)

	// Join another node of the cluster.
	// If this is the first node it can't join a cluster - there is no cluster
	joinServer := os.Getenv("NODE")
	if joinServer != "" {
		stdout.Printf("joining node %s", joinServer)
		n, err := list.Join([]string{joinServer})
		if err != nil {
			stdout.Panicf("error joining node: %v", err)
		}
		stdout.Printf("found %v nodes", n)
	}

	// Asynchronously poll the member of the cluster.
	// No need to have this routine, it's just for debug
	go func() {
		for {
			for _, member := range list.Members() {
				stdout.Printf("Members: %s %s\n", member.Name, member.Addr)
			}
			time.Sleep(1 * time.Minute)
		}
	}()

	// Finally, start the web server
	http.HandleFunc("/", requestHandler)
	err = http.ListenAndServe(":3333", nil)
	PanicIfErr(err)

}

func requestHandler(writer http.ResponseWriter, request *http.Request) {
	query := request.URL.Query()
	valueToSet := query.Get("set")

	// if there is a query param, set thevalue; otherwise, read the value
	if valueToSet == "" {
		lastValue := GetValue()
		_, _ = writer.Write([]byte(fmt.Sprintf("last value: %s", lastValue)))
		return
	}

	stdout.Printf("setting value to %s", valueToSet)
	SetValue(valueToSet)
	_, _ = writer.Write([]byte(fmt.Sprintf("SET last value: %s", valueToSet)))

}

func SetValue(valueToSet string) {
	stdout.Printf("setting value %s", valueToSet)
	delegate.Broadcasts.QueueBroadcast(&Message{Value: valueToSet})
}

func GetValue() string {
	return delegate.SharedVariableValue.Load().(string)
}

func PanicIfErr(err error) {
	if err != nil {
		panic(err)
	}
}

Bonus: Local test with docker

You can checkout the full code from this github.com/totomz/blog-test-memberlist. If you want to try shMake, you can easily start a cluster by opening 3 shell and run this commands:

# on the first shell, create the docker network and spawn the first container 
# that will listen on port 3031
docker network create gossip
shmake run --port=3031 --node=localhost

# on a separate shall, start 2 more node
# The ip for the first node can be found by inspecting docker:
#  docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' <container id>
shmake run --port=3032 --node=172.20.0.2
shmake run --port=3033 --node=172.20.0.2