Packages

  • package root
    Definition Classes
    root
  • package neko

    ScalaNeko is a framework designed to help with the prototyping of distributed algorithms.

    ScalaNeko Framework

    ScalaNeko is a framework designed to help with the prototyping of distributed algorithms. It is loosely based on the Neko framework [1] which was programmed in Java more than a decade earlier, mainly by Péter Urbán.

    Whereas the original Neko framework was designed for performance evaluation and modeling, the main focus of ScalaNeko is to serve as a support for teaching distributed algorithms. Hence, the current version of ScalaNeko only supports simulated execution. However, we still have the intention to support actual distributed execution in a future version, and hence provide a full replacement of the original Neko.

    1. Architecture

    In order to effectively use ScalaNeko, it is helpful to understand its general architecture, which can be described as follows:

    There are several important entities in ScalaNeko:

    • The system is what handles the execution engine within the virtual machine and the initialization procedure. There is exactly one instance running for every virtual machine. The system also holds a discrete event simulator. See neko.Main and neko.kernel.NekoSystem.
    • The network simulates the behavior of a network, and is responsible for transmitting messages between processes. In the current version, it is running over a discrete-event simulation. See neko.network.Network and neko.kernel.sim.Simulator.
    • The processes are the basic unit of concurrency, and represent a virtual computer connected through a network. Every process has a unique identity represented by a neko.PID. A process does nothing by itself and is merely a shell for protocols. See neko.NekoProcess and neko.ProcessConfig.
    • The protocols are the actual logic of the system and implement the algorithms. A process holds one or many protocols, which are organized as a stack. There are two kinds of protocols: active and reactive ones. While active protocols carry their own flow of execution, that is, act as a thread, concurrently with the system, the reactive protocols only execute code as a reaction to incoming events. See neko.ActiveProtocol, neko.ReactiveProtocol, neko.Protocol, and neko.ProtocolUtils.
    • Protocols and processes exchange information through events. There are two types of events: signals and messages. Signals allow protocols within the same process to notify each other. In contrast, messages allow protocol instances to communicate across different processes. In other words, only messages are transmitted through the network. See neko.Event, neko.Signal, neko.UnicastMessage, neko.MulticastMessage, and neko.Wrapper.

    A simplified view of the architecture of an execution of ScalaNeko is depicted below:

    +-------------------------------------------------------+
    |       process p1                    process pn        |
    |  +-------------------+         +-------------------+  |
    |  | +---------------+ |         | +---------------+ |  |
    |  | | protocol p1:A | |         | | protocol pn:A | |  |
    |  | +-------------+-+ |         | +-------------+-+ |  |
    |  |   |           |   |   ...   |   |           |   |  |
    |  | +-+-----------V-+ |         | +-+-----------V-+ |  |
    |  | | protocol p1:B | |         | | protocol pn:B | |  |
    |  | +-------------+-+ |         | +-------------+-+ |  |
    |  +---|-----------|---+         +---|-----------|---+  |
    |      |           |                 |           |      |
    |  +---+-----------V-----------------+-----------V---+  |
    |  |                      network                    |  |
    |  +-------------------------------------------------+  |
    |                  +------------------+                 |
    |                  |     simulator    |                 |
    |                  +------------------+       system    |
    +-------------------------------------------------------+

    Creating a ScalaNeko application typically requires to implement the following steps:

    1. Implement the protocols. At least, an application will require to implement an active protocol, but also possibly a number of reusable reactive ones.
    2. Each protocol is likely to define its own message types. The most appropriate location for doing so is in a companion object of the protocol. Messages are best defined as a case class so that they are ensured to be immutable and code for pattern matching is automatically generated by the compiler.
    3. Creating a process initializer that instantiates and connects the protocols of the processes.
    4. Creating a main object which provides the basic parameters of the execution, such as the total number of processes to create and their initializer.

    The initialization proceeds roughly as illustrated below:

         creates            creates
    Main ------> NekoSystem ------> Network
                            creates
                      ''    ------> ProcessInitializer
                            creates             creates
                      ''    =====>> NekoProcess =====>> Protocol

    2. Creating protocols

    A protocol can be either active or reactive. An active protocol is one that executes its own thread, concurrently with that of the other protocols or processes. In contrast, a reactive protocol only executes as a reaction to events, and does not do anything otherwise.

    2.1 Active protocols

    An active protocol is typically defined as a subclass of neko.ActiveProtocol.

    An active protocol has its own thread of control. The code of the protocol is implemented in its method neko.ActiveProtocol.run, which must be defined in the subclass. This code is executed concurrently with the rest of the system.

    An active protocol has access to operations for sending and receiving message. New messages are sent with the method neko.ActiveProtocol.SEND. While messages are received through blocking calls to neko.ActiveProtocol.Receive, as illustrated below. Note that, in order to receive messages of a certain type, the protocol must register by calling neko.ActiveProtocol.listenTo for this type.

    class PingPong(c: ProcessConfig) extends ActiveProtocol(c, "ping-pong")
    {
      val next = me.map{i => (i+1) % N}
      var record = Set.empty[Event]
    
      listenTo(classOf[Ping])
      listenTo(classOf[Pong])
      def run(): Unit =
      {
        SEND(Ping(me, next))
    
        Receive {
          case Ping(from, _) => SEND(Pong(me, from))
          case Pong(from, _) => SEND(Ping(me, from))
        }
    
        Receive { m =>
          record += m
        }
      }
    }

    It is also possible to override the method neko.ActiveProtocol.onReceive. By doing so, messages that are matched by onReceive are processed reactively upon arrival, while those that are not matched by onReceive are stored into the receive queue and must be handled by a blocking call to neko.ActiveProtocol.Receive.

    2.2 Reactive protocols

    Most protocols in a process are reactive. A reactive protocol is usually sandwiched between a network and an application (or a lower-level protocol and a higher-level one). The simplest way to implement one is by extending neko.ReactiveProtocol. The information has two flows: downstream and upstream. This is illustrated in the figure below.

             application
      |                      ^
      V                      |
    +----------------------------+
    | onSend        DELIVER(...) |
    |                            | Reactive protocol
    | SEND(...)        onReceive |
    +----------------------------+
      |                      ^
      V                      |
              network

    For the downstream flow (from application to network), the code of the protocol is implemented in the method neko.ReactiveProtocol.onSend, usually implemented as a scala.PartialFunction which reacts as appropriate to each event. The protocol can itself send messages through the neko.ReactiveProtocol.SEND method.

    For the upstream flow (from network to application), the code of the protocol is implemented in the method neko.ReactiveProtocol.onReceive, also implemented as a scala.PartialFunction which reacts appropriately to each incoming events. Events of a certain type are delivered to the protocol only if it registers to the event type by calling the neko.ReactiveProtocol.listenTo method on that event type. The protocol can deliver a message to the application through the method neko.ReactiveProtocol.DELIVER.

    Note that the two flows are not mutually exclusive. It is perfectly valid, and even frequent, for a protocol to call neko.ReactiveProtocol.DELIVER in neko.ReactiveProtocol.onSend, or to call neko.ReactiveProtocol.SEND in neko.ReactiveProtocol.onReceive .

    3. Defining new events (messages and signals)

    Let's start with a little bit of terminology. An event denotes anything that happens in the system and is represented by the abstract class neko.Event. Events can be of two types:

    • A signal is an event that occurs within one process, and can go from one protocol to another, but never cross process boundaries. It is represented by the subclasses of neko.Signal.
    • A message is an event that crosses process boundaries, but is typically (but not necessarily) interpreted by the same protocol in the target process. It is represented by the subclasses of neko.Message.

    A message can be "top-level" or a "wrapper". A top-level message is one that is created by the sending protocol. It has its own identity, as well as a source and destinations. In contrast, a wrapper is simply a shell that extends the information of an existing message. It retains the same identity, source, and destinations, but provides a shell to the message and can add its own information. This results into messages of three types:

    • A neko.MulticastMessage is a top-level message with multiple destinations. See the example below on how to define a new message:
    case class Snapshot(
        from: PID,
        to: Set[PID])
      extends MulticastMessage

    NB: The arguments *must* be named from and to.

    case class Token (
        from: PID,
        to: PID)
      extends UnicastMessage

    NB: The arguments *must* be named from and to.

    • A neko.Wrapper is a shell that wraps an existing message. A wrapper can also extend another wrapper; not only top-level messages. A wrapper preserves the identity, the source and the destinations of the message it wraps.
    case class SequencedMessage(msg: Message, sn: Int) extends Wrapper(msg)

    4. Initialization of a process

    While processes are created automatically, their protocols are not, and must be initialized and connected. This is done through a process initializer, by providing an instance of neko.ProcessInitializer, whose sole role is to create the protocols of a process and combine them.

    ProcessInitializer { p =>
        val app  = new PingPong(p)
        val fifo = new FIFOChannel(p)
        app --> fifo
      }

    In the above example, each process is initialized by executing the above code. The code creates two protocols while registering them into the object p given as argument (which represents the process being initialized). Then, the two protocols are connected such that all SEND operations of protocol app are handed to protocol fifo. The send operations of protocol fifo use the default target which is the network interface of the process.

    It is also possible to initialize processes differently, by discriminating based on the identifier of the process to initialize. That identifier is obtained from the argument with p.pid.

    5. Setting up a new system

    A new instance of a ScalaNeko system is created and configured by creating an object that extends neko.Main. The resulting object becomes a main object and is thus executable (neko.Main is a subclass of scala.App).

    Class neko.Main requires to set parameters, such as the network topology and the process initializer, as illustrated below:

    object PingPongApp extends Main(topology.Clique(3))( ProcessInitializer { p=> ... } )

    Future planned versions of ScalaNeko will make it possible to define many more parameters, such as the network topologyDescriptor, etc...

    References

    1. Péter Urbán, Xavier Défago, André Schiper: Neko: A Single Environment to Simulate and Prototype Distributed Algorithms. J. Inf. Sci. Eng. 18(6): 981-997 (2002).

    Contributors

    Lead architect: Xavier Défago

    Other contributors:

    • Naoyuki Onuki (trace system; integration with NekoViewer)
    Definition Classes
    root
  • package kernel
    Definition Classes
    neko
  • package sim
    Definition Classes
    kernel
  • ActivityID
  • ActivityManager
  • Dispatcher
  • Initializer
  • ManagedActivity
  • NekoSystem
  • ParallelScheduler
  • PeriodicTask
  • Scheduler
  • SequentialScheduler
  • SimpleDispatcher
  • SimpleTask
  • Task
c

neko.kernel

ActivityManager

class ActivityManager extends Mutable with LazyLogging

handles the lockstep execution of registered concurrent activities (protocol objects that implement the trait ManagedActivity).

The activities are assumed to exchange information using a producer-consumer model such as java.util.concurrent.BlockingQueue. The model is not safe if the activity is actually be made to wait on the queue (see details at the bottom).

Initialization:

  1. new activities are registered by calling registerActivity, which provides an identifier (ActivityID).
  2. after all activities have been registered, the activity manager is started start.
  3. the activities are started just like normal threads.

The lifecycle of an activity is as follows:

  1. upon starting, the activity first calls willStart once, and starts its concurrent execution as a thread.
  2. Each time the activity would be required to wait on a queue (checked by polling on the structure), it will instead wait through a call to willWait, and proceed to do the blocking call only once it is sure that it will not be made to wait.
  3. Once the activity has finished its normal execution, it calls willFinish which blocks until all other threads have finished.

After all activities started, the lockstep execution proceeds as follows:

  1. all activities execute concurrently until all of them are blocked on a call to willWait or willFinish.
  2. the manager executes all registered actions (i.e., actions registered through registerAction) sequentially, and in mutual exclusion with the activities.
  3. after all actions have been executed once, the cycle repeats if some activity has called willWait, or the cycle ends if all activities have called willFinish.
Additional notes on synchronization

As said before, activities can't be made to actually wait on a queue. This means that the queue must provide a means for polling (such as java.util.concurrent.BlockingQueue!.isEmpty), to make sure that the thread will not actually be blocked by the queue. Practically speaking, this means that:

  • some scala synchronization primitives like scala.concurrent.SyncVar or scala.concurrent.SyncChannel can't be used safely.
  • the queue can have only one consumer.
  • the queue must either be unbounded (e.g., java.util.concurrent.LinkedBlockingQueue as used in the implementation of neko.ActiveProtocol) or it must have a reserved location for every producer activity.
Attributes
protected[neko]
Linear Supertypes
LazyLogging, Mutable, AnyRef, Any
Type Hierarchy
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. ActivityManager
  2. LazyLogging
  3. Mutable
  4. AnyRef
  5. Any
Implicitly
  1. by any2stringadd
  2. by StringFormat
  3. by Ensuring
  4. by ArrowAssoc
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Instance Constructors

  1. new ActivityManager(system: NekoSystem)

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. def +(other: String): String
    Implicit
    This member is added by an implicit conversion from ActivityManager to any2stringadd[ActivityManager] performed by method any2stringadd in scala.Predef.
    Definition Classes
    any2stringadd
  4. def ->[B](y: B): (ActivityManager, B)
    Implicit
    This member is added by an implicit conversion from ActivityManager to ArrowAssoc[ActivityManager] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc
    Annotations
    @inline()
  5. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  6. def abnormallyTerminated: Option[Set[String]]
  7. def allActivitiesFinished: Boolean

    returns true if all activities have finished.

    returns true if all activities have finished.

    An activity is considered having finished if it has called the method willFinish.

    returns

    whether all activities have finished.

  8. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  9. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native() @HotSpotIntrinsicCandidate()
  10. def ensuring(cond: (ActivityManager) ⇒ Boolean, msg: ⇒ Any): ActivityManager
    Implicit
    This member is added by an implicit conversion from ActivityManager to Ensuring[ActivityManager] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  11. def ensuring(cond: (ActivityManager) ⇒ Boolean): ActivityManager
    Implicit
    This member is added by an implicit conversion from ActivityManager to Ensuring[ActivityManager] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  12. def ensuring(cond: Boolean, msg: ⇒ Any): ActivityManager
    Implicit
    This member is added by an implicit conversion from ActivityManager to Ensuring[ActivityManager] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  13. def ensuring(cond: Boolean): ActivityManager
    Implicit
    This member is added by an implicit conversion from ActivityManager to Ensuring[ActivityManager] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  14. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  15. def equals(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  16. def formatted(fmtstr: String): String
    Implicit
    This member is added by an implicit conversion from ActivityManager to StringFormat[ActivityManager] performed by method StringFormat in scala.Predef.
    Definition Classes
    StringFormat
    Annotations
    @inline()
  17. final def getClass(): Class[_]
    Definition Classes
    AnyRef → Any
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  18. def hasPendingMessages: Boolean

    returns true if some activity is waiting and has pending messages.

  19. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  20. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  21. def join(): Unit

    waits until all activities have finished.

    waits until all activities have finished.

    This method allows an external thread to synchronize on the manager and wait until the lockstep execution has finished. Beware that this method cannot be called by any of the registered activities, as this would result in a deadlock.

  22. lazy val logger: Logger
    Attributes
    protected
    Definition Classes
    LazyLogging
    Annotations
    @transient()
  23. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  24. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  25. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  26. def registerAction(action: () ⇒ Unit): Unit
  27. def registerActivity(activeProtocol: ManagedActivity): ActivityID
  28. def reset(): Unit
  29. def start(whenDone: ⇒ Unit): Unit

    starts the execution of the activity manager.

    starts the execution of the activity manager.

    After calling this method, it is no longer possible to register new activities (i.e., registerActivity), nor to add any new actions (i.e., registerAction), and any such attempt will either result in an exception or an error log message.

  30. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  31. val system: NekoSystem
  32. def toString(): String
    Definition Classes
    AnyRef → Any
  33. def unfinishedActivities: String

    returns a string that lists all registered activities that haven't yet finished.

    returns a string that lists all registered activities that haven't yet finished. This is intended for debugging and logging purpose.

    returns

    a string listing all registered activities that haven't yet finished

  34. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  35. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... ) @native()
  36. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  37. def willFinish(id: ActivityID): Unit

    an activity indicates that it has finished its execution and will be waiting until all activities have done so.

    an activity indicates that it has finished its execution and will be waiting until all activities have done so.

    The call is blocking until all other activities have also finished.

    id

    identifier of the activity

  38. def willStart(id: ActivityID): Unit

    an activity indicates that it is ready to start its execution.

    an activity indicates that it is ready to start its execution.

    The call is non-blocking.

    id

    identifier of the activity

  39. def willWait(id: ActivityID): Unit

    an activity indicates that it will be waiting for a condition to be satisfied (i.e., it is polling on the condition).

    an activity indicates that it will be waiting for a condition to be satisfied (i.e., it is polling on the condition).

    The call is putting the thread to sleep for a while, and the condition must be checked again after that.

    id

    identifier of the activity

  40. def [B](y: B): (ActivityManager, B)
    Implicit
    This member is added by an implicit conversion from ActivityManager to ArrowAssoc[ActivityManager] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc

Deprecated Value Members

  1. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] ) @Deprecated @deprecated
    Deprecated

    (Since version ) see corresponding Javadoc for more information.

Inherited from LazyLogging

Inherited from Mutable

Inherited from AnyRef

Inherited from Any

Inherited by implicit conversion any2stringadd from ActivityManager to any2stringadd[ActivityManager]

Inherited by implicit conversion StringFormat from ActivityManager to StringFormat[ActivityManager]

Inherited by implicit conversion Ensuring from ActivityManager to Ensuring[ActivityManager]

Inherited by implicit conversion ArrowAssoc from ActivityManager to ArrowAssoc[ActivityManager]

Ungrouped