neko
package neko
ScalaNeko Framework
ScalaNeko is a framework designed to help with the prototyping of distributed algorithms. It is loosely based on the Neko framework [1] which was programmed in Java more than a decade earlier, mainly by Péter Urbán.
Whereas the original Neko framework was designed for performance evaluation and modeling, the main focus of ScalaNeko is to serve as a support for teaching distributed algorithms. Hence, the current version of ScalaNeko only supports simulated execution. However, we still have the intention to support actual distributed execution in a future version, and hence provide a full replacement of the original Neko.
1. Architecture
In order to effectively use ScalaNeko, it is helpful to understand its general architecture, which can be described as follows:
There are several important entities in ScalaNeko:
- The system is what handles the execution engine within the virtual machine and the initialization procedure. There is exactly one instance running for every virtual machine. The system also holds a discrete event simulator. See neko.Main and neko.kernel.NekoSystem.
- The network simulates the behavior of a network, and is responsible for transmitting messages between processes. In the current version, it is running over a discrete-event simulation. See neko.network.Network and neko.kernel.sim.Simulator.
- The processes are the basic unit of concurrency, and represent a virtual computer connected through a network. Every process has a unique identity represented by a neko.PID. A process does nothing by itself and is merely a shell for protocols. See neko.NekoProcess and neko.ProcessConfig.
- The protocols are the actual logic of the system and implement the algorithms. A process holds one or many protocols, which are organized as a stack. There are two kinds of protocols: active and reactive ones. While active protocols carry their own flow of execution, that is, act as a thread, concurrently with the system, the reactive protocols only execute code as a reaction to incoming events. See neko.ActiveProtocol, neko.ReactiveProtocol, neko.Protocol, and neko.ProtocolUtils.
- Protocols and processes exchange information through events. There are two types of events: signals and messages. Signals allow protocols within the same process to notify each other. In contrast, messages allow protocol instances to communicate across different processes. In other words, only messages are transmitted through the network. See neko.Event, neko.Signal, neko.UnicastMessage, neko.MulticastMessage, and neko.Wrapper.
A simplified view of the architecture of an execution of ScalaNeko is depicted below:
+-------------------------------------------------------+ | process p1 process pn | | +-------------------+ +-------------------+ | | | +---------------+ | | +---------------+ | | | | | protocol p1:A | | | | protocol pn:A | | | | | +-------------+-+ | | +-------------+-+ | | | | | | | ... | | | | | | | +-+-----------V-+ | | +-+-----------V-+ | | | | | protocol p1:B | | | | protocol pn:B | | | | | +-------------+-+ | | +-------------+-+ | | | +---|-----------|---+ +---|-----------|---+ | | | | | | | | +---+-----------V-----------------+-----------V---+ | | | network | | | +-------------------------------------------------+ | | +------------------+ | | | simulator | | | +------------------+ system | +-------------------------------------------------------+
Creating a ScalaNeko application typically requires to implement the following steps:
- Implement the protocols. At least, an application will require to implement an active protocol, but also possibly a number of reusable reactive ones.
- Each protocol is likely to define its own message types. The most appropriate location for
doing so is in a companion object of the protocol. Messages are best defined as a
case class
so that they are ensured to be immutable and code for pattern matching is automatically generated by the compiler. - Creating a process initializer that instantiates and connects the protocols of the processes.
- Creating a main object which provides the basic parameters of the execution, such as the total number of processes to create and their initializer.
The initialization proceeds roughly as illustrated below:
creates creates Main ------> NekoSystem ------> Network creates '' ------> ProcessInitializer creates creates '' =====>> NekoProcess =====>> Protocol
2. Creating protocols
A protocol can be either active or reactive. An active protocol is one that executes its own thread, concurrently with that of the other protocols or processes. In contrast, a reactive protocol only executes as a reaction to events, and does not do anything otherwise.
2.1 Active protocols
An active protocol is typically defined as a subclass of neko.ActiveProtocol.
An active protocol has its own thread of control. The code of the protocol is implemented in its method neko.ActiveProtocol.run, which must be defined in the subclass. This code is executed concurrently with the rest of the system.
An active protocol has access to operations for sending and receiving message. New messages are sent with the method neko.ActiveProtocol.SEND. While messages are received through blocking calls to neko.ActiveProtocol.Receive, as illustrated below. Note that, in order to receive messages of a certain type, the protocol must register by calling neko.ActiveProtocol.listenTo for this type.
class PingPong(c: ProcessConfig) extends ActiveProtocol(c, "ping-pong") { val next = me.map{i => (i+1) % N} var record = Set.empty[Event] listenTo(classOf[Ping]) listenTo(classOf[Pong]) def run(): Unit = { SEND(Ping(me, next)) Receive { case Ping(from, _) => SEND(Pong(me, from)) case Pong(from, _) => SEND(Ping(me, from)) } Receive { m => record += m } } }
It is also possible to override the method neko.ActiveProtocol.onReceive. By doing so,
messages that are matched by onReceive
are processed reactively upon arrival, while those that
are not matched by onReceive
are stored into the receive queue and must be handled by a
blocking call to neko.ActiveProtocol.Receive.
2.2 Reactive protocols
Most protocols in a process are reactive. A reactive protocol is usually sandwiched between a network and an application (or a lower-level protocol and a higher-level one). The simplest way to implement one is by extending neko.ReactiveProtocol. The information has two flows: downstream and upstream. This is illustrated in the figure below.
application | ^ V | +----------------------------+ | onSend DELIVER(...) | | | Reactive protocol | SEND(...) onReceive | +----------------------------+ | ^ V | network
For the downstream flow (from application to network), the code of the protocol is implemented in the method neko.ReactiveProtocol.onSend, usually implemented as a scala.PartialFunction which reacts as appropriate to each event. The protocol can itself send messages through the neko.ReactiveProtocol.SEND method.
For the upstream flow (from network to application), the code of the protocol is implemented in the method neko.ReactiveProtocol.onReceive, also implemented as a scala.PartialFunction which reacts appropriately to each incoming events. Events of a certain type are delivered to the protocol only if it registers to the event type by calling the neko.ReactiveProtocol.listenTo method on that event type. The protocol can deliver a message to the application through the method neko.ReactiveProtocol.DELIVER.
Note that the two flows are not mutually exclusive. It is perfectly valid, and even frequent, for a protocol to call neko.ReactiveProtocol.DELIVER in neko.ReactiveProtocol.onSend, or to call neko.ReactiveProtocol.SEND in neko.ReactiveProtocol.onReceive .
3. Defining new events (messages and signals)
Let's start with a little bit of terminology. An event denotes anything that happens in the system and is represented by the abstract class neko.Event. Events can be of two types:
- A signal is an event that occurs within one process, and can go from one protocol to another, but never cross process boundaries. It is represented by the subclasses of neko.Signal.
- A message is an event that crosses process boundaries, but is typically (but not necessarily) interpreted by the same protocol in the target process. It is represented by the subclasses of neko.Message.
A message can be "top-level" or a "wrapper". A top-level message is one that is created by the sending protocol. It has its own identity, as well as a source and destinations. In contrast, a wrapper is simply a shell that extends the information of an existing message. It retains the same identity, source, and destinations, but provides a shell to the message and can add its own information. This results into messages of three types:
- A neko.MulticastMessage is a top-level message with multiple destinations. See the example below on how to define a new message:
case class Snapshot( from: PID, to: Set[PID]) extends MulticastMessage
NB: The arguments *must* be named from
and to
.
- A neko.UnicastMessage is a top-level message with a single destination process.
case class Token ( from: PID, to: PID) extends UnicastMessage
NB: The arguments *must* be named from
and to
.
- A neko.Wrapper is a shell that wraps an existing message. A wrapper can also extend another wrapper; not only top-level messages. A wrapper preserves the identity, the source and the destinations of the message it wraps.
case class SequencedMessage(msg: Message, sn: Int) extends Wrapper(msg)
4. Initialization of a process
While processes are created automatically, their protocols are not, and must be initialized and connected. This is done through a process initializer, by providing an instance of neko.ProcessInitializer, whose sole role is to create the protocols of a process and combine them.
ProcessInitializer { p => val app = new PingPong(p) val fifo = new FIFOChannel(p) app --> fifo }
In the above example, each process is initialized by executing the above code. The code creates
two protocols while registering them into the object p
given as argument (which represents the
process being initialized). Then, the two
protocols are connected such that all SEND
operations of protocol app
are handed to
protocol fifo
. The send operations of protocol fifo
use the default target which is
the network interface of the process.
It is also possible to initialize processes differently, by discriminating based on the
identifier of the process to initialize. That identifier is obtained from the argument
with p.pid
.
5. Setting up a new system
A new instance of a ScalaNeko system is created and configured by creating an object that extends neko.Main. The resulting object becomes a main object and is thus executable (neko.Main is a subclass of scala.App).
Class neko.Main requires to set parameters, such as the network topology and the process initializer, as illustrated below:
object PingPongApp extends Main(topology.Clique(3))( ProcessInitializer { p=> ... } )
Future planned versions of ScalaNeko will make it possible to define many more parameters, such as the network topologyDescriptor, etc...
References
- Péter Urbán, Xavier Défago, André Schiper: Neko: A Single Environment to Simulate and Prototype Distributed Algorithms. J. Inf. Sci. Eng. 18(6): 981-997 (2002).
Contributors
Lead architect: Xavier Défago
Other contributors:
- Naoyuki Onuki (trace system; integration with NekoViewer)
- Alphabetic
- By Inheritance
- neko
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Type Members
-
abstract
class
ActiveProtocol extends ProtocolImpl with ManagedActivity with ProtocolUtils with Receiving with ListenerUtils with Runnable with LazyLogging with Tracing
Basic class for defining active protocols; those typically at the top of the protocol stack.
Basic class for defining active protocols; those typically at the top of the protocol stack.
An active protocol must redefine the ActiveProtocol!.run method to implement the code of the protocol there. The code in run() can use several convenience methods, such as ActiveProtocol!.SEND, Receive, ProtocolUtils.me, ProtocolUtils.N, ProtocolUtils.ALL.
-
abstract
class
BroadcastMessage extends Message
Base class to define a new broadcast message.
Base class to define a new broadcast message. The effective destinations are the entire set of neighbors, as defined by the sender at the time of sending.
Typical declarations of top-level messages:
case class Hello (from: PID) extends BroadcastMessage case class Heartbeat (from: PID, time: Time) extends BroadcastMessage
Instantiation of top-level messages:
import neko._ val next = me.map { i => (i+1) % N } val m1 = Hello(me) val m2 = Heartbeat(me, now) val m3 = FIFO(me, next, seqnum, msg)
-
sealed
trait
Event extends Immutable
Basic trait to define all processable events in ScalaNeko.
Basic trait to define all processable events in ScalaNeko. There are two basic kinds of events:
- Events that are internal to a given process and used as notifications across protocol layers are called "signals". Such events must inherit from the class Signal, and are always local to the process on which they were created.
- Events that eventually transit through a network are called "messages". Such events must inherit from one of the subclasses of Message, namely:
- UnicastMessage are for messages with a single destination process, that are generated by a protocol.
- MulticastMessage are for messages with multiple destinations, that are generated by a protocol.
- Wrapper are for wrapping existing messages with added information (e.g., a sequence number), as is typically done for payload messages. Even without adding new information, a typical use case is to ensure that the message will first be processed by the same protocol (at the destination process) before its content is delivered to the higher layer.
Note that all events must be immutable. It is an error to create mutable messages and, even though this is not necessarily detected by the compiler, mutable messages will almost surely lead to faulty behavior that is difficult to reproduce and thus will be extremely difficult to detect.
-
trait
ID[T] extends AnyRef
Generic type to represent all kinds of identifiers in ScalaNeko.
Generic type to represent all kinds of identifiers in ScalaNeko. The type parameter refers to the type of value of the identifiers and depends on each kind of identifiers.
The main subclasses relevant to a protocol programmer are:
- T
type of the value encapsulated in the identifier
-
trait
Listener extends Receiver
declares that the protocol implementing the trait is able to "listen" to messages.
declares that the protocol implementing the trait is able to "listen" to messages. Typically, this is used indirectly through ListenerUtils.
-
trait
ListenerUtils extends Listener
mixin trait to provide support to listening to a message type.
-
class
Main extends AnyRef
Basic class used to define the system.
Basic class used to define the system.
An object that extends this class will act as a replacement for the main object, and also as a replacement for the configuration file used in original Neko. The parameters are used to create the system. This must provide the network topology (for which the number of processes is inferred), as well an initializer for the processes (see neko.ProcessInitializer).
For instance, the code below declares a system consisting of three processes, each of which is initialized by the process initializer provided:
object MyMain extends Main(topology.Clique(3))(ProcessInitializer { p=> ... })
-
sealed
trait
Message extends Event
Basic trait to define all messages.
Basic trait to define all messages. A message is an event that is supposed to propagate through the network, and thus cross process boundaries.
This superclass defines the basic information that every message must hold. It is not possible to extend this class directly, but rather messages are defined by extending one of its four subclasses: UnicastMessage, MulticastMessage, BroadcastMessage, or Wrapper.
The choice of a parent class (among the three classes described above) depends on the nature of the message to define.
- UnicastMessage is for a message that is generated by the protocol (typically a control message) and it makes no sense for that message to have multiple destinations (e.g., a token that circulates on a logical ring).
- MulticastMessage is for a message that is generated by the protocol and may have multiple destinations.
- BroadcastMessage is for a message that is generated by the protocol and is broadcast to all neighbors of sending process.
- Wrapper is for adding information to an existing message, typically obtained from the application (or a higher-level protocol). This occurs for instance, when our protocol needs to add sequence numbers or other similar information to an existing payload message.
NB: Note that, just like other events, all instances of Message and their subclass must be immutable. It is an error to define a mutable subclass or else behavior is undetermined.
-
case class
MessageID(value: Long) extends ID[Long] with Ordered[MessageID] with Product with Serializable
Class to represent the identifier of a message.
Class to represent the identifier of a message. Although it is unlikely (though not forbidden) that an application programmer will rely directly on message identifiers, it is not entirely transparent since they must be generated properly when creating new messages.
To make matters simple and to avoid obscuring the code of protocols, it is very strongly recommended to enforce auto-incrementation when declaring new messages. This is done by having the
id
field declared last and assigned with a default value obtained through the class method MessageID#autoIncrement, as illustrated below:case class Token( from: PID, to: PID, id: MessageID = MessageID.autoIncrement() ) extends UnicastMessage(from, to, id)
- value
the raw value of the ID
-
abstract
class
MulticastMessage extends Message
Base class to define a new message with multiple destination processes (multicast).
Base class to define a new message with multiple destination processes (multicast).
Each new message must be a subclass of UnicastMessage, of BroadcastMessage, or of MulticastMessage. For a protocols to add information to an existing message, it is necessary to define instead a wrapper message; i.e., a subclass of Wrapper.
Typical declarations of top-level messages:
case class Snapshot(from: PID, to: Set[PID]) extends MulticastMessage case class ViewChange(from: PID, to: Set[PID], viewNum: Long, epochNum: Long) extends MulticastMessage case class Heartbeat(from: PID, to: Set[PID], sentAt: Time, sn: Long) extends MulticastMessage
The fields to and from must be defined. Thus, to work properly, it is essential that new messages are created with
from
andto
arguments as above.Instantiation of top-level messages:
import neko._ val m1 = Snapshot(me, neighbors) val m2 = ViewChange(me, ALL, myView, myEpoch) val m3 = Heartbeat(me, neighbors, now, seqnum)
Defining actual messages as a
case
class is a recommended practice and highly convenient since it allows for pattern matching without requiring any additional work.def onReceive = { case Snapshot(from,_) if from < me => // e.g., getting a snapshot from a process with lower id case ViewChange(_,_,view,epoch) if view > currentView => currentView = view // ... }
-
trait
NamedEntity extends AnyRef
Created by defago on 02/05/2017.
- class NekoProcess extends NamedEntity with LazyLogging
-
case class
PID(value: Int) extends ID[Int] with Ordered[PID] with Product with Serializable
Class to represent process identifiers.
Class to represent process identifiers. The field value corresponds to the index of the process and is guaranteed to be unique. When the execution is simulated on a single machine, the PIDs are consecutive numbers from
PID(0)
toPID(N-1)
.In addition, there is a total order on process identifiers, which allows to compare them.
- value
the index of the process
- class ProcessConfig extends AnyRef
-
trait
ProcessInitializer extends Function[ProcessConfig, Unit]
Basic trait for implementing the process initialization.
Basic trait for implementing the process initialization.
A process needs an initializer class to instantiate its protocols, register them, and connect them together. Such a class must be a subclass of ProcessInitializer and contain the code for initialization as shown in the example below.
class LamportMutexInitializer extends ProcessInitializer { forProcess { p => // create protocols val app = new MutexApplication(p) val clock = new protocol.LamportClock(p) val mutex = new LamportMutex(p, clock) val fifo = new protocol.FIFOChannel(p) // connect protocols app --> mutex mutex --> clock clock --> fifo } }
Below is an alternative to create an initializer that is more adapted when used as argument for the neko.Main class.
ProcessInitializer { p => // create protocols val app = new MutexApplication(p) val clock = new protocol.LamportClock(p) val mutex = new LamportMutex(p, clock) val fifo = new protocol.FIFOChannel(p) // connect protocols app --> mutex mutex --> clock clock --> fifo }
Without a block, the initializer does nothing by default, thus resulting in an "empty" process.
-
case class
ProtoID(value: String) extends ID[String] with Product with Serializable
Class to represent protocol identifiers.
Class to represent protocol identifiers. The field value corresponds to the nickname of the protocol.
- value
the nickname of the protocol
-
trait
Protocol extends NamedEntity
Defines the basic operations that a protocol must provide.
Defines the basic operations that a protocol must provide.
An application programmer will not use this trait directly, but rather use its two main subclasses: ActiveProtocol for active protocols, and ReactiveProtocol for reactive protocols.
-
abstract
class
ProtocolImpl extends Protocol with ProtocolUtils
Core class for implementing a reactive protocol.
Core class for implementing a reactive protocol.
This provides the basic functionality for implementing reactive protocols. It is designed to be used in conjunction with Sending and Receiving. It is more convenient to extends the class ReactiveProtocol which has most relevant traits mixed-in.
-
trait
ProtocolUtils extends AnyRef
Provides basic functionality to the protocol that implements this trait.
Provides basic functionality to the protocol that implements this trait.
The concrete protocol must provide an implementation for
process
,system
, andProtocolUtils#dispatcher
. It provides the ability to connect and the following fields:me
is the identifier of the current process.N
is the total number of processes in the system.ALL
is the set of identifiers for all processes in the system (includingme
).neighbors
is the set of identifiers of all neighbor processes.
-
abstract
class
ReactiveProtocol extends ProtocolImpl with ListenerUtils with Sending with Receiving with Tracing
Convenience class to replace ProtocolImpl with all common traits already mixed in.
Convenience class to replace ProtocolImpl with all common traits already mixed in.
A protocol that sits as a middle layer between application and network can extend this class, which provides all of the basic functionalities.
A subclass must provide an implementation to the methods Sending!.onSend and Receiving!.onReceive.
- trait Receiver extends AnyRef
- trait Receiving extends Receiver
-
trait
Sender extends NamedEntity
provides the send operation.
-
trait
Sending extends Sender
Defines the behavior for a protocol that provides the sending of messages as capability.
Defines the behavior for a protocol that provides the sending of messages as capability.
The trait is used to complement the functionality of an instance of ProtocolImpl by declaring that the instance is capable to send a message through the network. Thus, it is usually used together with Receiving. For instance,
class FIFOChannel (p: NekoProcess, nickname: String) extends ProtocolImpl(p, nickname) with Sending with Receiving { def onSend = { case ... } }
There are cases where this trait could be used without Receiving, for instance, by a protocol that would only record what it is sent to, without sending anything through the actual network. This is however a very rare situation, and can safely be ignored as far as the lecture I445 is concerned.
-
abstract
class
Signal extends Event
Basic class for internal events.
Basic class for internal events.
All messages used for signaling within a process should inherit from this class. Just like other messages, a protocol must listen to it for an internal event to be delivered successfully. Unlike messages, no error is raised in case there is no protocol to listen to it.
Example:
case object Terminate extends Signal case class ChangeID(id: Int) extends Signal
-
case class
SignalID(value: UUID) extends ID[UUID] with Product with Serializable
Class to represent unique identifiers for instances of Signal.
Class to represent unique identifiers for instances of Signal. The field SignalID!.value corresponds to a universal unique ID (java.util.UUID) which is guaranteed to be unique with very high probability for any two different instances of Signal. This identifier will seldom be used by application programmers.
- value
the unique ID of the signal
- trait StashedReceive extends AnyRef
-
case class
TaskID(value: Long) extends ID[Long] with Ordered[TaskID] with Product with Serializable
identifier of a task obtained from scheduling an action through an instance of Timer.
identifier of a task obtained from scheduling an action through an instance of Timer. Tasks identifiers are totally ordered, which allows to compare them.
- value
identifying number of the task.
-
class
Timer extends AnyRef
supports the scheduling of delayed and periodic tasks.
supports the scheduling of delayed and periodic tasks.
Some example to illustrate the use of a timer:
val timer = new Timer(...) val myTask = timer.periodically(Time.second) { t => // do something every second timer.continueWhile(hasSomethingToDo) } val myOtherTask = timer.scheduleAfter(Time.second) { t => // do something one second later } timer.cancel(myOtherTask) // ... or not.
-
abstract
class
UnicastMessage extends Message
Base class to define a new message with one single destination process (unicast).
Base class to define a new message with one single destination process (unicast).
Each new message must be a subclass of UnicastMessage, of BroadcastMessage, or of MulticastMessage. For a protocols to add information to an existing message, it is necessary to define instead a wrapper message; i.e., a subclass of Wrapper.
Typical declarations of top-level messages:
case class Token (from: PID, to: PID) extends UnicastMessage case class Ack (from: PID, to: PID) extends UnicastMessage case class FIFO (from: PID, to: PID, sn: Long, payload: Message) extends UnicastMessage
The fields to and from must be defined. Thus, to work properly, it is essential that new messages are created with
from
andto
arguments as above.Instantiation of top-level messages:
import neko._ val next = me.map { i => (i+1) % N } val m1 = Token(me, next) val m2 = Ack(me, next) val m3 = FIFO(me, next, seqnum, msg)
Defining actual messages as a
case
class is a recommended practice and highly convenient since it allows for pattern matching without requiring any additional work.def onReceive = { case Token(from,_) if from < me => // e.g., getting a token from a process with lower id case FIFO(_,_,sn,m) if deliverNext == sn => deliverNext += 1 DELIVER(m) }
-
abstract
class
Wrapper extends Message
Basic class from messages used to add information to existing messages (wrappers).
Basic class from messages used to add information to existing messages (wrappers).
The resulting message retains all information from the original, and can add some specific to the protocol.
Typical message definition:
case class SequencedMessage(msg: Message, sn: Int) extends Wrapper(msg)
typical use:
var sequenceNumber = 0 def onSend = { case m => SEND(SequencedMessage(m, sequenceNumber)) ; sequenceNumber += 1 }
Value Members
- object MessageID extends Serializable
- object NekoProcess
- object ProcessInitializer
- object TaskID extends Serializable
-
object
Timeout extends Signal with Product with Serializable
Event generated by ActiveProtocol.Receive.withTimeout upon a timeout.