class ActivityManager extends Mutable with LazyLogging
handles the lockstep execution of registered concurrent activities (protocol objects that implement the trait ManagedActivity).
The activities are assumed to exchange information using a producer-consumer model such as java.util.concurrent.BlockingQueue. The model is not safe if the activity is actually be made to wait on the queue (see details at the bottom).
Initialization:
- new activities are registered by calling registerActivity, which provides an identifier (ActivityID).
- after all activities have been registered, the activity manager is started start.
- the activities are started just like normal threads.
The lifecycle of an activity is as follows:
- upon starting, the activity first calls willStart once, and starts its concurrent execution as a thread.
- Each time the activity would be required to wait on a queue (checked by polling on the structure), it will instead wait through a call to willWait, and proceed to do the blocking call only once it is sure that it will not be made to wait.
- Once the activity has finished its normal execution, it calls willFinish which blocks until all other threads have finished.
After all activities started, the lockstep execution proceeds as follows:
- all activities execute concurrently until all of them are blocked on a call to willWait or willFinish.
- the manager executes all registered actions (i.e., actions registered through registerAction) sequentially, and in mutual exclusion with the activities.
- after all actions have been executed once, the cycle repeats if some activity has called willWait, or the cycle ends if all activities have called willFinish.
Additional notes on synchronization
As said before, activities can't be made to actually wait on a queue. This means that the queue must provide a means for polling (such as java.util.concurrent.BlockingQueue!.isEmpty), to make sure that the thread will not actually be blocked by the queue. Practically speaking, this means that:
- some scala synchronization primitives like scala.concurrent.SyncVar or scala.concurrent.SyncChannel can't be used safely.
- the queue can have only one consumer.
- the queue must either be unbounded (e.g., java.util.concurrent.LinkedBlockingQueue as used in the implementation of neko.ActiveProtocol) or it must have a reserved location for every producer activity.
- Attributes
- protected[neko]
- Alphabetic
- By Inheritance
- ActivityManager
- LazyLogging
- Mutable
- AnyRef
- Any
- by any2stringadd
- by StringFormat
- by Ensuring
- by ArrowAssoc
- Hide All
- Show All
- Public
- All
Instance Constructors
- new ActivityManager(system: NekoSystem)
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
def
+(other: String): String
- Implicit
- This member is added by an implicit conversion from ActivityManager to any2stringadd[ActivityManager] performed by method any2stringadd in scala.Predef.
- Definition Classes
- any2stringadd
-
def
->[B](y: B): (ActivityManager, B)
- Implicit
- This member is added by an implicit conversion from ActivityManager to ArrowAssoc[ActivityManager] performed by method ArrowAssoc in scala.Predef.
- Definition Classes
- ArrowAssoc
- Annotations
- @inline()
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def abnormallyTerminated: Option[Set[String]]
-
def
allActivitiesFinished: Boolean
returns true if all activities have finished.
returns true if all activities have finished.
An activity is considered having finished if it has called the method willFinish.
- returns
whether all activities have finished.
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native() @HotSpotIntrinsicCandidate()
-
def
ensuring(cond: (ActivityManager) ⇒ Boolean, msg: ⇒ Any): ActivityManager
- Implicit
- This member is added by an implicit conversion from ActivityManager to Ensuring[ActivityManager] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
def
ensuring(cond: (ActivityManager) ⇒ Boolean): ActivityManager
- Implicit
- This member is added by an implicit conversion from ActivityManager to Ensuring[ActivityManager] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
def
ensuring(cond: Boolean, msg: ⇒ Any): ActivityManager
- Implicit
- This member is added by an implicit conversion from ActivityManager to Ensuring[ActivityManager] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
def
ensuring(cond: Boolean): ActivityManager
- Implicit
- This member is added by an implicit conversion from ActivityManager to Ensuring[ActivityManager] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
formatted(fmtstr: String): String
- Implicit
- This member is added by an implicit conversion from ActivityManager to StringFormat[ActivityManager] performed by method StringFormat in scala.Predef.
- Definition Classes
- StringFormat
- Annotations
- @inline()
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
def
hasPendingMessages: Boolean
returns true if some activity is waiting and has pending messages.
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
def
join(): Unit
waits until all activities have finished.
waits until all activities have finished.
This method allows an external thread to synchronize on the manager and wait until the lockstep execution has finished. Beware that this method cannot be called by any of the registered activities, as this would result in a deadlock.
-
lazy val
logger: Logger
- Attributes
- protected
- Definition Classes
- LazyLogging
- Annotations
- @transient()
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def registerAction(action: () ⇒ Unit): Unit
- def registerActivity(activeProtocol: ManagedActivity): ActivityID
- def reset(): Unit
-
def
start(whenDone: ⇒ Unit): Unit
starts the execution of the activity manager.
starts the execution of the activity manager.
After calling this method, it is no longer possible to register new activities (i.e., registerActivity), nor to add any new actions (i.e., registerAction), and any such attempt will either result in an exception or an error log message.
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
- val system: NekoSystem
-
def
toString(): String
- Definition Classes
- AnyRef → Any
-
def
unfinishedActivities: String
returns a string that lists all registered activities that haven't yet finished.
returns a string that lists all registered activities that haven't yet finished. This is intended for debugging and logging purpose.
- returns
a string listing all registered activities that haven't yet finished
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
def
willFinish(id: ActivityID): Unit
an activity indicates that it has finished its execution and will be waiting until all activities have done so.
an activity indicates that it has finished its execution and will be waiting until all activities have done so.
The call is blocking until all other activities have also finished.
- id
identifier of the activity
-
def
willStart(id: ActivityID): Unit
an activity indicates that it is ready to start its execution.
an activity indicates that it is ready to start its execution.
The call is non-blocking.
- id
identifier of the activity
-
def
willWait(id: ActivityID): Unit
an activity indicates that it will be waiting for a condition to be satisfied (i.e., it is polling on the condition).
an activity indicates that it will be waiting for a condition to be satisfied (i.e., it is polling on the condition).
The call is putting the thread to sleep for a while, and the condition must be checked again after that.
- id
identifier of the activity
-
def
→[B](y: B): (ActivityManager, B)
- Implicit
- This member is added by an implicit conversion from ActivityManager to ArrowAssoc[ActivityManager] performed by method ArrowAssoc in scala.Predef.
- Definition Classes
- ArrowAssoc