Archive

Archive for the ‘TPL’ Category

Outperforming MathNet with Task Parallel Library

February 25, 2012 3 comments

Math.NET is a great project that brings numerics to .NET the OS way. Perusing their blog, I found this post on the on-line algorithm for std calculation. A simple Wiki search revealed this article that describes a parallel algorithm due to Chan for calculating variance (std = sqrt(variance)). So, I set to figure out whether I could improve on Math.NET in the case of a large dataset.

First of all, my algorithm is slightly different from Math.NET. I have implemented the on-line algorithm from the Wikipedia article, since Chan’s parallel algorithm uses it to merge datasets calculated “on-line”:

(Full sources on my GitHub)

    [<DebuggerDisplay("{Display}")>]
    type auxValues = 
        {M2: float; mean : float; n : float

        }
        with
            member private a.Display = "M2: " + a.M2.ToString() + "; mean: " + a.mean.ToString() + "; n: " + a.n.ToString()

    let auxValuesOfSet (data : double []) start finish =
        let mutable n = 0.
        let mutable mean = 0.
        let mutable M2 = 0.

        for x = start to finish do
                n <- n + 1.
                let delta = data.[x] - mean
                mean <- mean + delta/n
                if n > 1. then M2 <- M2 + delta * (data.[x] - mean) 

        {M2 = M2; mean = mean; n = float(finish - start + 1)}

    let Variance (data : double []) =
        (auxValuesOfSet data 0 (data.Length - 1)).M2 / (float data.Length)

Math.NET implements the “naive” algorithm, and, as my experiments showed, it performs better than the on-line algorithm on large datasets. On a dataset of 100,000,000 doubles, Math.NET gives me about 3330 ms, while the on-line algorithm gives about 4260 ms. The driver program source is here.

Now the fun begins with parallel. I am using TPL (Task Parallel Library) with its promise to utilize all of the cores of my machine (Intel i5, 4 logical cores @ 3.2 GHz, 8Gb RAM).

My first crack at parallelizing was to simply split the dataset in two and run both as parallel tasks:

    let combineM2s (r1 : auxValues) (r2 : auxValues) =
                            
        let delta = r1.mean - r2.mean
        let deltaSq = delta * delta
        let n = r1.n + r2.n
        let M2 = r1.M2 + r2.M2 + deltaSq * r1.n *r2.n / n
        let mean = (r1.n * r1.mean + r2.n * r2.mean) / n

        {M2 = M2; mean = mean; n = n}

    let Variance2(data : double []) =
        let partition = data.Length / 2
        let finish = data.Length - 1
        let tasks = 
            [|
                Task.Factory.StartNew(fun () -> auxValuesOfSet data 0 partition) 
                Task.Factory.StartNew(fun () -> auxValuesOfSet data (partition + 1) finish)
            |]

        let results = Task.Factory.ContinueWhenAll(tasks, fun tasks -> tasks |> Array.map(fun (v : Task<auxValues>) -> v.Result)).Result
        let res = combineM2s results.[0] results.[1]
        res.M2 / res.n

Surprisingly enough performance, instead of doubling as I expected, grew by about a factor of 8: 478ms for the on-line algorithm. Here combineM2s implements Chan’s parallel algorithm which joins to sets of variance calculations.

Calculations now flying, I decided to do it right and have TPL partition my dataset optimally between the cores. I then wrote the following that, naturally, does not work (although it took me some time to figure out why):

    let VarianceForDoesntWork (data : double []) =
        let monitor = new obj()
        let m2 = ref {M2 = 0.; n= 0.; mean = 0.} 

        Parallel.ForEach(
                Partitioner.Create(0, data.Length), 
            (fun () -> {M2 = 0.; n= 0.; mean = 0.}), 
            (fun range state local -> auxValuesOfSet data (fst range) ((snd range) - 1)
                ),
            (fun (local : auxValues) -> lock monitor (fun () -> do m2:= combineM2s local !m2))) |> ignore

        (!m2).M2 / (!m2).n

So why does this not work? Superficially, everything seems in order. We have a Parallel.For, which partitions our collection in chunks by creating tuples that stand for ranges of indices into the collection: [incluse, exclusive). Each iteration gets a local instantiation (line 07), then the on-line algorithm is called on the subset of the collection (line 08) and then Chan’s algorithm is called to keep accumulating results of individual on-line algorithms (line 10).

This, however, does not work, because ForEach accumulates local and final results not per-iteration, but per-task. A task may be composed of several iterations, however, which means, that this form of ForEach is in actuality an aggregator and needs to be written accordingly. The following implementation will work:

    let VarianceForCummul (data : double []) =
        let monitor = new obj()
        let m2list : auxValues list ref = ref []

        Parallel.ForEach(
            Partitioner.Create(0, data.Length), 
            (fun () -> ref []), 
            (fun range state local -> 
                local := auxValuesOfSet data (fst range) ((snd range) - 1) :: !local; local),
            (fun local -> lock monitor (fun () -> m2list := !m2list @ !local))) |> ignore

        let res = !m2list |> List.reduce combineM2s
        res.M2 / res.n

Here each task produces a list of what the Wiki article calls M2’s, calculated by its (the task) iterations of the body of the ForEach loop over the ranges, that this task gets to execute (it may produce several such “local” values, and they are all kept in a list), and then those are wrapped into a final result by calling List.reduce with Chan’s aggregation.

Finally a home-grown implementation of Parallel.For:

    let VarianceForTask (data : double []) =
        let monitor = new obj()
        let partitioner = Partitioner.Create(0, data.Length)

        let partitions = partitioner.GetDynamicPartitions()

        let tasks = 
            [| for p in partitions ->
                Task.Factory.StartNew(fun () -> auxValuesOfSet data (fst p) ((snd p) - 1))
            |] 

        let results = Task.Factory.ContinueWhenAll(tasks, fun tasks -> tasks |> Array.map(fun (v : Task<auxValues>) -> v.Result)).Result

        let res = results |> Array.reduce combineM2s
        res.M2 / res.n

This works just as well as Parallel.For in my experiments: both give around 350 – 370ms. A roughly 20% improvement over the “naive parallel” algorithm.