LAgent: an agent framework in F# – Part VIII - Implementing MapReduce (user model)

-

Download frame­work here.

All posts are here:

For this post I use a newer ver­sion of the frame­work that I just up­loaded on CodeGallery. In the process of us­ing LAgent I grew more and more un­happy with the weakly typed way of send­ing mes­sages. The code that im­ple­ments that fea­ture is nasty: full of up­casts and down­casts. I was los­ing faith in it. Bugs were crop­ping up in all sorts of sce­nar­ios (i.e. us­ing generic union types as mes­sages).

In the end I de­cided to re-ar­chi­tec­ture the frame­work so to make it strongly typed. In essence now each agent can just re­ceive mes­sages of a sin­gle type. The lim­i­ta­tions that this de­sign choice in­tro­duces (i.e. more lim­ited hot swap­ping) are com­pen­sated by the catch­ing of er­rors at com­pile time and the stream­lin­ing of the code. I left the old frame­work on the site in case you dis­agree with me.

In any case, to­day’s post is about MapReduce. It as­sumes that you know what it is (link to the orig­i­nal Google pa­per that served as in­spi­ra­tion is here: Google Research Publication- MapReduce). What would it take to im­ple­ment an in-mem­ory MapReduce us­ing my agent frame­work?

Let’s start with the user model.

let mapReduce   (inputs:seq<'in_key * 'in_value>)
                (map:'in_key -> 'in_value -> seq<'out_key * 'out_value>)
                (reduce:'out_key -> seq<'out_value> -> seq<'reducedValues>)
                outputAgent
                M R partitionF =                

mapRe­duce takes seven pa­ra­me­ters:

  1. inputs: a sequence of input key/value pairs.
  2. map: this function operates on each input key/value pair. It  returns a sequence of output key/value pairs. The type of the output sequence can be different from the type of the inputs.
  3. reduce: this function operates on an output key and all the values associated with it. It returns a sequence of reduced values (i.e. the average of all the values for this key)
  4. ouputAgent: this is the agent that gets notified every time a new output key has been reduced and at the end when all the operation ends.
  5. M: how many mapper agents to instantiate
  6. R: how many reducer agents to instantiate
  7. partitionF: the partition function used to choose which of the reducers is associated with a key

Let’s look at how to use this func­tion to find how of­ten each word is used in a set of files. First a sim­ple par­ti­tion func­tion can be de­fined as:

let partitionF = fun key M -> abs(key.GetHashCode()) % M 

Given a key and some buck­ets, it picks one of the buck­ets. Its type is: a –> int –> int, so it’s fairly reusable.

Let’s also cre­ate a ba­sic agent that just prints out the re­duced val­ues:

let printer = spawnWorker (fun msg ->
                            match msg with
                            | Reduced(key, value)   -> printfn "%A %A" key value
                            | MapReduceDone         -> printfn "All done!!")

The agent gets no­ti­fied when­ever a new key is re­duced or the al­go­rithm ends. It is use­ful to be no­ti­fied im­me­di­ately in­stead of wait­ing for every­thing to be done. If I had­n’t writ­ten this code us­ing agents I would have not re­al­ized that pos­si­bil­ity. I would sim­ply have framed the prob­lem as a func­tion that takes an in­put and re­turns an out­put. Agents force you to think ex­plic­itly about the par­al­lelism in your app. That’s a good thing.

The map­ping func­tion sim­ply split the con­tent of a file into words and adds a word/​1 pair to the list. I know that there are much bet­ter ways to do this (i.e. reg­u­lar ex­pres­sions for the pars­ing and sum­ming words counts in­side the func­tion), but I wanted to test the ba­sic frame­work ca­pa­bil­i­ties and do­ing it this way does it bet­ter.

let map = fun (fileName:string) (fileContent:string) ->
            let l = new List<string * int>()
            let wordDelims = [|' ';',';';';'.';':';'?';'!';'(';')';'n';'t';'f';'r';'b'|]
            fileContent.Split(wordDelims) |> Seq.iter (fun word -> l.Add((word, 1)))
            l :> seq<string * int>

The re­ducer func­tion sim­ply sums the var­i­ous word sta­tis­tics sent by the map­pers:

let reduce = fun key (values:seq<int>) -> [values |> Seq.sum] |> seq<int>

Now we can cre­ate some fake in­put to check that it works:

let testInput = ["File1", "I was going to the airport when I saw someone crossing";
"File2", "I was going home when I saw you coming toward me"]

And ex­e­cute the mapRe­duce:

mapReduce testInput map reduce printer 2 2 partitionF

On my ma­chine I get the fol­low­ing. You might get a dif­fer­ent or­der be­cause of the async/​par­al­lel pro­cess­ing in­volved. If I wanted a sta­ble or­der I would need to change the printer agent to cache re­sults on Reduced and process them on MapReduceDone (see next post).

I” [4]

crossing” [1]

going” [2]

home” [1]

me” [1]

the” [1]

toward” [1]

airport” [1]

coming” [1]

saw” [2]

someone” [1]

to” [1]

was” [2]

when” [2]

you” [1]

In the next post we’ll process some real books …

Tags

4 Comments

Comments

Gary Davidson

2009-09-08T23:24:59Z

I can­not com­pile in VS 2008 or VS2010
VS2010 say build failed be­cause of the method FromContinuations
in AgentSystem.fs
type AsyncResultCell<’T>() =
       let source = new TaskCompletionSource<’T>()
       member this.Reg­is­ter­Re­sult r = source.Se­tRe­sult(r)
       member this.AsyncWait­Re­sult =
           Async.FromContinuations(fun (cont,_,_) ->
               let y = fun (t:Task<’T>) -> cont (t.Result)
               source.Task.ContinueWith(y) |> ig­nore)

Oh sorry,
it was called Primitive in VS10 B1. I’m us­ing my dev ma­chine to code this, which has B2.
Just change the code and it should work (or wait for B2 to show up :-) )  

Gary Davidson

2009-09-11T19:51:09Z

Still hav­ing com­pile is­sues, sigh!!! So who do you have to sleep with to get b2? lol

If you re­place it with Primitive you get:
Error1The method ContinueWith’ is over­loaded. Possible matches are shown be­low (or in the Error List win­dow)D:AgentsAgentSys­tem.fs16932A­gents
Error2  Possible over­load: Task.ContinueWith(continuationAction: Action<Task<‘T>>) : Task’.D:AgentsAgentSystem.fs16932Agents
Error3  Possible over­load: Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,’TNewResult>) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error4  Possible over­load: Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,Task<’TNewResult>>) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error5  Possible over­load: Task.ContinueWith(continuationAction: Action<Task>) : Task’.D:AgentsAgentSystem.fs16932Agents
Error6  Possible over­load: Task.ContinueWith<’TResult>(continuationFunction: Func<Task,’TResult>) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error7  Possible over­load: Task.ContinueWith<’TResult>(continuationFunction: Func<Task,Task<’TResult>>) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error8  Possible over­load: Task.ContinueWith(continuationAction: Action<Task<‘T>>, sched­uler: TaskScheduler) : Task’.D:AgentsAgentSystem.fs16932Agents
Error9  Possible over­load: Task.ContinueWith(continuationAction: Action<Task<‘T>>, con­tin­u­a­tionOp­tions: TaskContinuationOptions) : Task’.D:AgentsAgentSystem.fs16932Agents
Error10  Possible over­load: Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,’TNewResult>, sched­uler: TaskScheduler) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error11  Possible over­load: Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,’TNewResult>, con­tin­u­a­tionOp­tions: TaskContinuationOptions) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error12  Possible over­load: Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,Task<’TNewResult>>, sched­uler: TaskScheduler) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error13  Possible over­load: Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,Task<’TNewResult>>, con­tin­u­a­tionOp­tions: TaskContinuationOptions) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error14  Possible over­load: Task.ContinueWith(continuationAction: Action<Task>, sched­uler: TaskScheduler) : Task’.D:AgentsAgentSystem.fs16932Agents
Error15  Possible over­load: Task.ContinueWith(continuationAction: Action<Task>, con­tin­u­a­tionOp­tions: TaskContinuationOptions) : Task’.D:AgentsAgentSystem.fs16932Agents
Error16  Possible over­load: Task.ContinueWith<’TResult>(continuationFunction: Func<Task,’TResult>, sched­uler: TaskScheduler) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error17  Possible over­load: Task.ContinueWith<’TResult>(continuationFunction: Func<Task,’TResult>, con­tin­u­a­tionOp­tions: TaskContinuationOptions) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error18  Possible over­load: Task.ContinueWith<’TResult>(continuationFunction: Func<Task,Task<’TResult>>, sched­uler: TaskScheduler) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error19  Possible over­load: Task.ContinueWith<’TResult>(continuationFunction: Func<Task,Task<’TResult>>, con­tin­u­a­tionOp­tions: TaskContinuationOptions) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error20  Possible over­load: Task.ContinueWith(continuationAction: Action<Task<‘T>>, con­tin­u­a­tionOp­tions: TaskContinuationOptions, sched­uler: TaskScheduler) : Task’.D:AgentsAgentSystem.fs16932Agents
Error21  Possible over­load: Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,’TNewResult>, con­tin­u­a­tionOp­tions: TaskContinuationOptions, sched­uler: TaskScheduler) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error22  Possible over­load: Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,Task<’TNewResult>>, con­tin­u­a­tionOp­tions: TaskContinuationOptions, sched­uler: TaskScheduler) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error23  Possible over­load: Task.ContinueWith(continuationAction: Action<Task>, con­tin­u­a­tionOp­tions: TaskContinuationOptions, sched­uler: TaskScheduler) : Task’.D:AgentsAgentSystem.fs16932Agents
Error24  Possible over­load: Task.ContinueWith<’TResult>(continuationFunction: Func<Task,’TResult>, con­tin­u­a­tionOp­tions: TaskContinuationOptions, sched­uler: TaskScheduler) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error25  Possible over­load: Task.ContinueWith<’TResult>(continuationFunction: Func<Task,Task<’TResult>>, con­tin­u­a­tionOp­tions: TaskContinuationOptions, sched­uler: TaskScheduler) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents