Skip to content

Sessions Manager

The sessions manager is the actor that handles incoming (inbound) and outgoing (outbound) sessions. Its responsibilities include:

  • Create new sessions (i.e. starting a session actor)
  • Register / unregister new sessions
  • Keep track of the status of the sessions
  • Periodically check the number of outgoing connections. If less than the configured number of outgoing peers, the sessions manager will:
  • Request a new peer address from the PeersManager.
  • Send a message to the ConnectionsManager to request a new TCP connection to that peer.

The sessions manager is the actor that encapsulates the logic of the sessions library, defined under the subcrate witnet_p2p. The library allows to manage the sessions collection present at the Witnet node.

State

The state of the Sessions Manager is an instance of the Sessions library, which contains the collection of inbound and outbound sessions present at the Witnet node.

#[derive(Default)]
pub struct SessionsManager {
    // Registered sessions
    sessions: Sessions<Addr<Session>>,
}

The Sessions struct is generic over a type T, which is the type of the reference to the Session. As the actors paradigm is being used, this generic type T is the Addr of the Session actor.

Actor creation and registration

The creation of the sessions manager actor and its registration into the system registry are performed directly by the main process:

let sessions_manager_addr = SessionsManager::default().start();
System::current().registry().set(sessions_manager_addr);

API

Incoming messages: Others -> Sessions Manager

These are the messages supported by the sessions manager handlers:

Message Input type Output type Description
Create TcpStream, SessionType () Request to create a new session
Register SocketAddr, Addr<Session>, SessionType SessionsResult<()> Request to register a new session
Unregister SocketAddr, SessionType, SessionStatus SessionsResult<()> Request to unregister a session
Consolidate SocketAddr, SessionType SessionsResult<()> Request to consolidate a session
Anycast<T> T, bool () Request to send a T message to a random consolidated outbound Session (when bool flag safu is true, use only outbound sessions in consensus)
Broadcast<T> T () Request to send a T message to all the consolidated outbound sesions

The handling of these messages is basically just calling the corresponding methods from the Sessions library. For example, the handler of the Register message would be implemented as:

pub type SessionsUnitResult = SessionsResult<()>;

/// Handler for Register message.
impl Handler<Register> for SessionsManager {
    type Result = SessionsUnitResult;

    fn handle(&mut self, msg: Register, _: &mut Context<Self>) -> Self::Result {
        // Call method register session from sessions library
        self.sessions
            .register_session(msg.session_type, msg.address, msg.actor)
    }
}

Being the SessionsManager such a simple actor, there are no errors that can arise due to its own logic and thus, returning just a SessionsResult<()> library generic error may be the right thing to do.

The way other actors will communicate with the sessions manager is:

  1. Get the address of the sessions manager from the registry:

    // Get sessions manager address
    let sessions_manager_addr = System::current().registry().get::<SessionsManager>();
    
  2. Use any of the sending methods provided by the address (do_send(), try_send(), send()) to send a message to the actor:

    session_manager_addr
        .send(Register {
            address: self.address,
            actor: ctx.address(),
            session_type: self.session_type,
        })
        .into_actor(self)
        .then(|res, _act, ctx| {
            match res {
                Ok(Ok(_)) => debug!("Session successfully registered into the Session Manager"),
                _ => debug!("Session register into Session Manager failed")
            }
            actix::fut::ok(())
        })
        .wait(ctx);
    

Anycast

The handler for Anycast<T> messages is basically just calling the method get_random_anycast_session from the Sessions library to obtain a random Session and forward the T message to it.

When bool flag safu is true, use only outbound consolidated sessions in consensus.

The return value of the delegated call is processed by act.process_command_response(&res)

   /// Method to process session SendMessage response
    fn process_command_response<T>(
        &mut self,
        response: &Result<T::Result, MailboxError>,
    ) -> FutureResult<(), (), Self>
    where
        T: Message,
        Session: Handler<T>,
    {
        match response {
            Ok(_) => actix::fut::ok(()),
            Err(_) => actix::fut::err(()),
        }
    }

Broadcast

Similarly to the Anycast<T> handler, the handler for Broadcast<T> is just calling the method get_all_consolidated_outbound_sessions from Sessions library and forwards the message T to all the received addresses.

This message does not do any error handling, the messages are all assumed to be successfully sent.

Outgoing messages: Sessions Manager -> Others

These are the messages sent by the sessions manager:

Message Destination Input type Output type Description
GetConfig ConfigManager () Result<Config, io::Error> Request the configuration
GetRandomPeer PeersManager () PeersResult<Option<SocketAddr>> Request the address of a peer
OutboundTcpConnect ConnectionsManager SocketAddr () Request a TCP conn to an address
Anycast<GetPeers> SessionsManager () () Request to forward a GetPeers message to one randomly selected Session

GetConfig

This message is sent to the ConfigManager actor when the sessions manager actor is started.

The returned configuration is used to store some parameters at the Sessions state:

  • Server address: used in the Witnet node to avoid connections with itself.
  • Inbound limit: used to reject incoming connections once the limit has been reached.
  • Outbound limit: used to stop requesting new outgoing connections once the limit has been reached.
  • Handshake timeout: sent to the session upon creation to set a time limit to the handshake process.

For further information, see ConfigManager.

GetRandomPeer

This message is sent to the PeersManager actor when the sessions manager actor detects that the number of outbound sessions registered is less than the configured limit. This detection is done in a bootstrap periodic task.

The return value is then processed. If an error happened, nothing occurs. If the PeersManager returned an address, then the SessionsManager checks if it is valid and if so, it sends an OutboundTcpConnect message to the ConnectionsManager to start a new TCP connection to that address.

In this context, a valid address means that:

  • The address is not the own Witnet node's server address
  • The address is not one of the already existing outbound connections

For further information, see PeersManager.

OutboundTcpConnect

This message is sent to the ConnectionsManager actor when the sessions manager receives a valid peer address from the PeersManager.

It is a best effort message, its return value is not processed and the sessions manager actor does not even wait for its response after sending it.

If the operation was successful, the sessions manager will know it by other means (a session will be created and registered into the SessionsManager). If the operation was not successful, then the sessions manager will detect in its next periodic bootstrap task that there are no new outbound connections and try to create a new one.

For further information, see ConnectionsManager.

Anycast

Due to the SessionsManager having an Anycast<T> handler to forward a T message to one randomly selected Session, this message is periodically sent to itself.

It is a best effort message, its return value is not processed and the SessionsManager actor does not even wait for its response after sending it.

This message causes SessionManager to forward a GetPeers message to one randomly selected Session actor.

Further information

The full source code of the SessionsManager can be found at sessions_manager.rs.