2

Lazy Pirate Client

 2 years ago
source link: https://bbengfort.github.io/2017/07/lazy-pirate/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Lazy Pirate Client

July 14, 2017 · 2 min · Benjamin Bengfort

In the [last post]({% post_url 2017-07-13-zmq-basic %}) I discussed a simple REQ/REP pattern for ZMQ. However, by itself REQ/REP is pretty fragile. First, every REQ requires a REP and a server can only handle one request at a time. Moreover, if the server fails in the middle of a reply, then everything is hung. We need more reliable REQ/REP, which is actually the subject of an entire chapter in the ZMQ book.

For my purposes, I want to ensure that repliers (servers) can fail without taking out the client. The server can simply sock.Send(zmq.DONTWAIT) to deal with clients that dropout before the communication is complete. Server failure is a bit more difficult to deal with, however. Client side reliability is based on timeouts and retries, dealing with failed messages. ZMQ calls this the Lazy Pirate Pattern.

This is a pretty big chunk of code, but it creates a Client object that wraps a socket and performs lazy pirate sends. The primary code is in the Reset() and Send() methods. The Reset() method sets the linger to zero in order to close the connection immediately without errors; it then closes the connection and reconnects thereby resetting the state to be able to send messages again. This is “brute force but effective and reliable”.

The Send() method fires off a message then uses a zmq.Poller with a timeout to keep checking if a message has been received in that time limit. If it was successful, then great! Otherwise we decrement our retries and try again. If we’re out of retries there is nothing to do but return an error. The code is here:

package zmqnet

import ( "fmt" "time"

zmq "github.com/pebbe/zmq4" )

//=========================================================================== // Lazy Pirate Client //===========================================================================

// New creates a client with a remote address and context. Can also specify // the number of retries per request and the timeout before retrying. func New(addr string, ctx *zmq.Context, retries int, timeout time.Duration) *Client { return &Client{ addr: addr, ctx: ctx, retries: retries, timeout: timeout, } }

// Client communicates with a remote peer. type Client struct { addr string ctx *zmq.Context sock *zmq.Socket retries int timeout time.Duration }

// Connect to the remote peer func (c *Client) Connect() (err error) { // Create the socket if c.sock, err = c.ctx.NewSocket(zmq.REQ); err != nil { return err }

// Connect to the address if err = c.sock.Connect(c.addr); err != nil { return err }

return nil }

// Close the connection to the remote peer func (c *Client) Close() error { // Set linger to 0 so the connection closes immediately if err := c.sock.SetLinger(0); err != nil { return err }

// Close the socket and return return c.sock.Close() }

// Reset the socket by setting the linger to 0, closing it, then reconnecting. func (c *Client) Reset() error {

// Close the socket if err := c.Close(); err != nil { return err }

// And reconnect return c.Connect() }

//=========================================================================== // Transport Methods //===========================================================================

// Send a message to the remote peer in a safe fashion, lazy pirate style. // Returns a reply from the server or an error if unsuccessful. func (c *Client) Send(message string) (string, error) { // Send the initial message if _, err := c.sock.Send(message, zmq.DONTWAIT); err != nil { return "", err }

// The number of retries for this message retries := c.retries

for {

// Poll socket for a reply, with timeout poller := zmq.NewPoller() poller.Add(c.sock, zmq.POLLIN) sockets, err := poller.PollAll(c.timeout) if err != nil { return "", err }

// Process a reply and exit if the reply is valid. Otherwise clsoe // socket and retry the message for num retries. Abandon after we // exhaust the number of allocated retries. if sock := sockets[0]; sock.Events&zmq.POLLIN != 0 {

// Success! Return the reply from the server. return sock.Socket.Recv(0)

} else if retries--; retries == 0 {

// Number of retries has expired, reset the connection and error if err := c.Reset(); err != nil { return "", fmt.Errorf("message dropped, could not reset connection: %s", err) }

return "", fmt.Errorf("connection to %s is offline, message dropped", c.addr)

} else {

// No response from the server in timeout, retrying the message. // Old socket is confused, reset it to be able to send again. if err := c.Reset(); err != nil { return "", err }

// Resend the original message if _, err := c.sock.Send(message, zmq.DONTWAIT); err != nil { return "", err } } } }

This code is fairly lengthy, but as it turns out, most of the content for both clients and servers on either side of REQ/REP have similar wrapper code for context, socket, and connection/bind wrapping. So far it’s been very reliable in my code to allow servers to drop out and fail without blocking clients or other nodes in the network.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK