LAgent: an agent framework in F# – Part VIII - Implementing MapReduce (user model)
Luca Bolognese -Download framework here.
All posts are here:
- Part I - Workers and ParallelWorkers
- Part II - Agents and control messages
- Part III - Default error management
- Part IV - Custom error management
- Part V - Timeout management
- Part VI - Hot swapping of code
- Part VII - An auction framework
- Part VIII – Implementing MapReduce (user model)
- Part IX – Counting words …
For this post I use a newer version of the framework that I just uploaded on CodeGallery. In the process of using LAgent I grew more and more unhappy with the weakly typed way of sending messages. The code that implements that feature is nasty: full of upcasts and downcasts. I was losing faith in it. Bugs were cropping up in all sorts of scenarios (i.e. using generic union types as messages).
In the end I decided to re-architecture the framework so to make it strongly typed. In essence now each agent can just receive messages of a single type. The limitations that this design choice introduces (i.e. more limited hot swapping) are compensated by the catching of errors at compile time and the streamlining of the code. I left the old framework on the site in case you disagree with me.
In any case, today’s post is about MapReduce. It assumes that you know what it is (link to the original Google paper that served as inspiration is here: Google Research Publication- MapReduce). What would it take to implement an in-memory MapReduce using my agent framework?
Let’s start with the user model.
let mapReduce (inputs:seq<'in_key * 'in_value>) (map:'in_key -> 'in_value -> seq<'out_key * 'out_value>) (reduce:'out_key -> seq<'out_value> -> seq<'reducedValues>) outputAgent M R partitionF =
mapReduce takes seven parameters:
- inputs: a sequence of input key/value pairs.
- map: this function operates on each input key/value pair. It returns a sequence of output key/value pairs. The type of the output sequence can be different from the type of the inputs.
- reduce: this function operates on an output key and all the values associated with it. It returns a sequence of reduced values (i.e. the average of all the values for this key)
- ouputAgent: this is the agent that gets notified every time a new output key has been reduced and at the end when all the operation ends.
- M: how many mapper agents to instantiate
- R: how many reducer agents to instantiate
- partitionF: the partition function used to choose which of the reducers is associated with a key
Let’s look at how to use this function to find how often each word is used in a set of files. First a simple partition function can be defined as:
let partitionF = fun key M -> abs(key.GetHashCode()) % M
Given a key and some buckets, it picks one of the buckets. Its type is: ’a –> int –> int, so it’s fairly reusable.
Let’s also create a basic agent that just prints out the reduced values:
let printer = spawnWorker (fun msg -> match msg with | Reduced(key, value) -> printfn "%A %A" key value | MapReduceDone -> printfn "All done!!")
The agent gets notified whenever a new key is reduced or the algorithm ends. It is useful to be notified immediately instead of waiting for everything to be done. If I hadn’t written this code using agents I would have not realized that possibility. I would simply have framed the problem as a function that takes an input and returns an output. Agents force you to think explicitly about the parallelism in your app. That’s a good thing.
The mapping function simply split the content of a file into words and adds a word/1 pair to the list. I know that there are much better ways to do this (i.e. regular expressions for the parsing and summing words counts inside the function), but I wanted to test the basic framework capabilities and doing it this way does it better.
let map = fun (fileName:string) (fileContent:string) -> let l = new List<string * int>() let wordDelims = [|' ';',';';';'.';':';'?';'!';'(';')';'n';'t';'f';'r';'b'|] fileContent.Split(wordDelims) |> Seq.iter (fun word -> l.Add((word, 1))) l :> seq<string * int>
The reducer function simply sums the various word statistics sent by the mappers:
let reduce = fun key (values:seq<int>) -> [values |> Seq.sum] |> seq<int>
Now we can create some fake input to check that it works:
let testInput = ["File1", "I was going to the airport when I saw someone crossing";
"File2", "I was going home when I saw you coming toward me"]
And execute the mapReduce:
mapReduce testInput map reduce printer 2 2 partitionF
On my machine I get the following. You might get a different order because of the async/parallel processing involved. If I wanted a stable order I would need to change the printer agent to cache results on Reduced and process them on MapReduceDone (see next post).
“I” [4]
“crossing” [1]
“going” [2]
“home” [1]
“me” [1]
“the” [1]
“toward” [1]
“airport” [1]
“coming” [1]
“saw” [2]
“someone” [1]
“to” [1]
“was” [2]
“when” [2]
“you” [1]
In the next post we’ll process some real books …
Tags
- FSHARP
4 Comments
Comments
Gary Davidson
2009-09-08T23:24:59ZI cannot compile in VS 2008 or VS2010
VS2010 say build failed because of the method FromContinuations
in AgentSystem.fs
type AsyncResultCell<’T>() =
let source = new TaskCompletionSource<’T>()
member this.RegisterResult r = source.SetResult(r)
member this.AsyncWaitResult =
Async.FromContinuations(fun (cont,_,_) ->
let y = fun (t:Task<’T>) -> cont (t.Result)
source.Task.ContinueWith(y) |> ignore)
lucabol
2009-09-09T17:40:57ZOh sorry,
it was called Primitive in VS10 B1. I’m using my dev machine to code this, which has B2.
Just change the code and it should work (or wait for B2 to show up :-) )
Gary Davidson
2009-09-11T19:51:09ZStill having compile issues, sigh!!! So who do you have to sleep with to get b2? lol
Fanboy
2009-09-21T16:08:54ZIf you replace it with Primitive you get:
Error1The method ‘ContinueWith’ is overloaded. Possible matches are shown below (or in the Error List window)D:AgentsAgentSystem.fs16932Agents
Error2 Possible overload: ’Task.ContinueWith(continuationAction: Action<Task<‘T>>) : Task’.D:AgentsAgentSystem.fs16932Agents
Error3 Possible overload: ’Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,’TNewResult>) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error4 Possible overload: ’Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,Task<’TNewResult>>) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error5 Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task>) : Task’.D:AgentsAgentSystem.fs16932Agents
Error6 Possible overload: ’Task.ContinueWith<’TResult>(continuationFunction: Func<Task,’TResult>) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error7 Possible overload: ’Task.ContinueWith<’TResult>(continuationFunction: Func<Task,Task<’TResult>>) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error8 Possible overload: ’Task.ContinueWith(continuationAction: Action<Task<‘T>>, scheduler: TaskScheduler) : Task’.D:AgentsAgentSystem.fs16932Agents
Error9 Possible overload: ’Task.ContinueWith(continuationAction: Action<Task<‘T>>, continuationOptions: TaskContinuationOptions) : Task’.D:AgentsAgentSystem.fs16932Agents
Error10 Possible overload: ’Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,’TNewResult>, scheduler: TaskScheduler) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error11 Possible overload: ’Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,’TNewResult>, continuationOptions: TaskContinuationOptions) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error12 Possible overload: ’Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,Task<’TNewResult>>, scheduler: TaskScheduler) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error13 Possible overload: ’Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,Task<’TNewResult>>, continuationOptions: TaskContinuationOptions) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error14 Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task>, scheduler: TaskScheduler) : Task’.D:AgentsAgentSystem.fs16932Agents
Error15 Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task>, continuationOptions: TaskContinuationOptions) : Task’.D:AgentsAgentSystem.fs16932Agents
Error16 Possible overload: ’Task.ContinueWith<’TResult>(continuationFunction: Func<Task,’TResult>, scheduler: TaskScheduler) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error17 Possible overload: ’Task.ContinueWith<’TResult>(continuationFunction: Func<Task,’TResult>, continuationOptions: TaskContinuationOptions) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error18 Possible overload: ’Task.ContinueWith<’TResult>(continuationFunction: Func<Task,Task<’TResult>>, scheduler: TaskScheduler) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error19 Possible overload: ’Task.ContinueWith<’TResult>(continuationFunction: Func<Task,Task<’TResult>>, continuationOptions: TaskContinuationOptions) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error20 Possible overload: ’Task.ContinueWith(continuationAction: Action<Task<‘T>>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task’.D:AgentsAgentSystem.fs16932Agents
Error21 Possible overload: ’Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,’TNewResult>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error22 Possible overload: ’Task.ContinueWith<’TNewResult>(continuationFunction: Func<Task<’T>,Task<’TNewResult>>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<‘TNewResult>’.D:AgentsAgentSystem.fs16932Agents
Error23 Possible overload: ‘Task.ContinueWith(continuationAction: Action<Task>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task’.D:AgentsAgentSystem.fs16932Agents
Error24 Possible overload: ’Task.ContinueWith<’TResult>(continuationFunction: Func<Task,’TResult>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents
Error25 Possible overload: ’Task.ContinueWith<’TResult>(continuationFunction: Func<Task,Task<’TResult>>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<‘TResult>’.D:AgentsAgentSystem.fs16932Agents