Addressed pub/sub architecture

I've used the publisher-subscriber design architecture in a bunch of projects including robots and computer games. And while it works well, there are some limitations that I find annoying. So solve some of these problems, I've thought up a combination of pub/sub with an addressing component.

The Publisher-subscriber design pattern is a message passing design. You have software component A that sends a message to software component B. A piece of middleware routes these messages based on topic. For example a radio receiver could output a message on the topic "TargetPosition", a kinematics software module could subscribe to these messages and emit a message on a topic "DesiredJointAngle". A motor driver could subscribe to "DesiredJointAngle" and translate this into i2c or PWM to actually drive a motor.
All nice and simple.

A normal pub/sub implementation

A typical message inside of a publisher-subscriber system contains a bunch of data and a topic: struct MessageContainer { data: Vec[u8] topic: Enum[TopicName] } The router then forwards from one software node to another based on these topics: // Grab all the messages from various software nodes all_messages = vec![] for node in system_node_list { all_messages.extend(node.outbox.drain()) } // Forward them to where they are needed for message in all_messages{ for node in system_node_list { if node.subscribes_to(message.topic){ node.inbox.push(message.copy()) } } }

The edge cases of pub/sub

How do you design the layout of topics and components? There is very little written about this, and very few example graphs. So here's the onboard system communication graph to give an example from a robot I built. Elipses are software modules, boxes are topics/messages.

  1. One thing you may notice is that there are a couple similar topics: CommandLegLeftServo, CommandLegRightServo, CommandLegSelectServo etc. These are all the same message contents - but in order to target a specific servo, they have to be on a different topic. In some ways it would be nice to have a single topic "CommandServo" that all the servo motion commands were sent down. This would mean that hooking up a debugger to that topic would result in you seeing the entire motion of the robot. Of course, then the message would (in typical pub/sub) have to specify the target servo, and each servo would have to handle receiving messages for other servos.
  2. This is closely related to handling system discovery. Consider a spaceship with a bunch of engines and thrusters. It would be nice to write a generic control system that can steer the spaceship even if some of the thrusters aren't working, or if extra ones are bolted on. If you have a topic-per-thruster this is involves dynamically creating topics and the control systems somehow needs to be informed of this.
  3. The main advantage of pub/sub is that each module is completely decoupled - no-one cares who is listening or where the messages come from. While that is nice in theory, it doesn't always work in practice. For this reason, ROS introduces the concept of services which allow you to make RPC's to a specific software module. This breaks the decoupled nature of pub/sub but it is needed for centralized resources such as kinematics descriptions, logging, etc.

Addressing to the rescue

As it turns out, both of these slightly edge-cases can be solved by introducing optional addressing to each message. Ie: extend the MessageContainer: struct MessageContainer { data: Vec[u8], topic: Enum[TopicName], to: Option[address], from: Option[address] } The router is then altered to respect the "to" field - with "None" being a broadcast to all subscribers. // Grab all the messages from various software nodes all_messages = vec![] for node in system_node_list { for message in node.outbox.drain() { message.from = Some(node.address) all_messages.push(message) } all_messages.extend(node.outbox.drain()) } // Forward them to where they are needed for message in all_messages{ for node in system_node_list { if node.subscribes_to(message.topic){ if message.address == None || message.address == node.address { node.inbox.push(message.copy()) } } } } Note that the router automatically fills in the "from" field of the message. This ensures there is always a "from" address but the software module that is sending the message doesn't have to know it.

Why is this useful?

Removing topic duplication

We now have an address contained in each message, so we don't need a dozen topics like CommandServoLegLeft, CommandServoLegRight anymore. So long as we know their address, we can just use a CommandServo message that contains the address of the correct servo driver.

Solving System Discovery

This also solves system discovery as a system can advertise it's presence, and subscribers to that advertisment now immediatly know how to respond to it. Using the spaceships/thrusters example:

// Our messages: struct ThrusterPresenceMessage { max_thrust: f32, thrust_vector: [f32, f32, f32] } struct ThrusterControlMessage { desired_thrust: f32 } struct SpacecraftMotionRequest { linear_acceleration: [f32, f32, f32] angular_acceleration: [f32, f32, f32] } // Our thruster control system needs to store a mapping of thrusters struct ThrusterControlSystemState { known_thrusters: hashmap[address: ThrusterPresenceMessage] } fn control_thrusters(control_system_state, new_messages, communication_node) { for message in communication_node.inbox.drain() { if message.topic == ThrusterPresenceMessage { control_system_state.known_thrusters[messages.from] = message.data } else if message.topic == SpacecraftMotionRequest { thruster_outputs = compute_thruster_outputs(message.data, control_system_state.known_thrusters) for thruster in thruster_outputs { communication_node.outbox.push(thruster) } } } } So long as each thruster periodically transmits a ThrusterPresenceMessage, the function "compute_thruster_outputs" will have a list of all available thrusters to work with to figure out the kinematics. The system would need to be extended to handle removal of thrusters and changes in center of mass but you get the idea.

Solving RPC's

Well, now a node can tell where a message is from, and can send one back to the same place. This makes writing systems that do RPC's trivial. For example a kinematics request that sends the result back to the system that requested it:

// Calculate the end effector position from a set of joints if message.topic == EndEffectorStateQuery { outbox.push( data: calculate_effector_position(message.data) to: message.from from: None ) }

And these are sent only to the code that queried it. To turn this into an actual RPC, you need the message data to contain some sort of request ID to distinguish multiple similar requests, but other than that, it allows many different path planning algorithms to use the same kinematics calculation software system while still being nicely decoupled.

You can do both broadcast and specific-address RPC's. A broadcast request (to = None) may be "ThrusterControlSystem here asking for all thrusters to give me their ThrusterPresenseMessages". A specific request (to = specific address) may be "ThrusterControlSystem asking thruster 7 for calibration information".

Other things to consider

There are some things that need to be worked out in this system. One of these edge cases is: can messages be sent without a topic, purely using the addressing system? The router implementation above doesn't support this and requires a topic, but I can imagine a system that doesn't.
I'm going to be using this in the game I'm currently developing (to be revealed soon), so I'll see how it goes.