Fsharp.Actor


Patterns

Create actors that represent a common coding pattern.

Dispatch

Round robin

Round robin dispatch, distributes messages in a round robin fashion to its workers.

let createWorker i =
    Actor.spawn (Actor.Options.Create(sprintf "workers/worker_%d" i)) (fun (actor:IActor<int>) ->
        let log = (actor :?> Actor.T<int>).Log
        let rec loop() = 
            async {
                let! (msg,_) = actor.Receive()
                do log.Debug(sprintf "Actor %A recieved work %d" actor msg, None)
                do! Async.Sleep(5000)
                do log.Debug(sprintf "Actor %A finshed work %d" actor msg, None)
                return! loop()
            }
        loop()
    )

let workers = [|1..10|] |> Array.map createWorker
let rrrouter = Patterns.Dispatch.roundRobin<int> "workers/routers/roundRobin" workers

[1..10] |> List.iter ((<--) rrrouter)

Shortest Queue

Shortest queue, attempts to find the worker with the shortest queue and distributes work to them. For constant time work packs this will approximate to round robin routing.

Using the workers defined above we can define another dispatcher but this time using the shortest queue dispatch strategy

let sqrouter = Patterns.Dispatch.shortestQueue "workers/routers/shortestQ" workers

[1..100] |> List.iter ((<--) sqrouter)
val createWorker : i:int -> IActor

Full name: Patterns.createWorker
val i : int
module Actor

from FSharp.Actor
val spawn : options:Actor.Options<'a> -> computation:(IActor<'a> -> Async<unit>) -> IActor

Full name: FSharp.Actor.Actor.spawn
type Options<'a> =
  {Id: string;
   Mailbox: IMailbox<ActorMessage<'a>>;
   Supervisor: IActor<SupervisorMessage> option;
   Logger: ILogger;
   Path: ActorPath;}
  static member Create : ?id:string * ?mailbox:IMailbox<ActorMessage<'a>> * ?supervisor:IActor<SupervisorMessage> * ?logger:ILogger * ?address:ActorPath -> Options<'a>
  static member Default : Options<'a>

Full name: FSharp.Actor.Actor.Options<_>
static member Actor.Options.Create : ?id:string * ?mailbox:IMailbox<ActorMessage<'a>> * ?supervisor:IActor<SupervisorMessage> * ?logger:ILogger * ?address:ActorPath -> Actor.Options<'a>
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val actor : IActor<int>
Multiple items
type IActor =
  interface
    abstract member Link : IActor -> unit
    abstract member Post : obj * IActor option -> unit
    abstract member PostSystemMessage : SystemMessage * IActor option -> unit
    abstract member Start : unit -> unit
    abstract member UnLink : IActor -> unit
    abstract member UnWatch : unit -> unit
    abstract member Watch : IActor -> unit
    abstract member add_OnRestarted : Handler<IActor> -> unit
    abstract member add_OnStarted : Handler<IActor> -> unit
    abstract member add_OnStopped : Handler<IActor> -> unit
    ...
  end

Full name: FSharp.Actor.Types.IActor

--------------------
type IActor<'a> =
  interface
    inherit IActor
    abstract member Post : 'a -> unit
    abstract member Post : 'a * IActor option -> unit
    abstract member Receive : int option -> Async<'a * IActor option>
    abstract member Receive : unit -> Async<'a * IActor option>
  end

Full name: FSharp.Actor.Types.IActor<_>
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val log : ILogger
type T<'a> =
  interface IActor<'a>
  private new : computation:(IActor<'a> -> Async<unit>) * ?options:Options<'a> -> T<'a>
  override Equals : y:obj -> bool
  override GetHashCode : unit -> int
  override ToString : unit -> string
  member Log : ILogger

Full name: FSharp.Actor.Actor.T<_>
val loop : (unit -> Async<'a>)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val msg : int
abstract member IActor.Receive : unit -> Async<'a * IActor option>
abstract member IActor.Receive : int option -> Async<'a * IActor option>
abstract member ILogger.Debug : string * exn option -> unit
union case Option.None: Option<'T>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val workers : IActor []

Full name: Patterns.workers
module Array

from Microsoft.FSharp.Collections
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.map
val rrrouter : IActor

Full name: Patterns.rrrouter
module Patterns

from FSharp.Actor
module Dispatch

from FSharp.Actor.Patterns
val roundRobin<'a> : name:string -> refs:IActor [] -> IActor

Full name: FSharp.Actor.Patterns.Dispatch.roundRobin
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
  interface IEnumerable
  interface IEnumerable<'T>
  member Head : 'T
  member IsEmpty : bool
  member Item : index:int -> 'T with get
  member Length : int
  member Tail : 'T list
  static member Cons : head:'T * tail:'T list -> 'T list
  static member Empty : 'T list

Full name: Microsoft.FSharp.Collections.List<_>
val iter : action:('T -> unit) -> list:'T list -> unit

Full name: Microsoft.FSharp.Collections.List.iter
val sqrouter : IActor

Full name: Patterns.sqrouter
val shortestQueue : name:string -> refs:seq<IActor> -> IActor

Full name: FSharp.Actor.Patterns.Dispatch.shortestQueue
Fork me on GitHub