Wednesday, May 21, 2014

Improve Matrices Operations Speed (Part 2)

This new post will describe, how to invert a matrix and which are the best methods. As mentioned in “How to Solve” paragraph, the conventional Algorithm for the inversion between matrices is the LU decomposition. This method is quite expensive in terms of speed but I know there is a parallel version too. I have tried to study it, but I am sorry, I didn’t accept the challenge to rewrite LU decomposition in parallel version, it needed time that I didn’t have and I am only a developer and not mathematician.

Fortunately, when I studied Strassen for multiplication there was on the book an interesting conclusion: With this “fast” matrix multiplication, Strassen also obtained a surprising result for matrix inversion.

This very intriguing phrase bring me to the following formula:

R1 = Inverse(a11)
R2 = a21 x R1
R3 = R1 x a12
R4 = a21 x R3
R5 = R4 - a22
R6 = Inverse(R5)
c12 = R3 x R6
c21 = R6 x R2
R7 = R3 x c21
c11 = R1 - R7
c22 = -R6

So what I learned was that it is possible to obtain the matrix (multiple of 2^n) inversion from some “lighter” operations. I implemented this algorithm and compared it with the Gauss Jordan one.
Comparing those two algorithms, I obtained, using my notebook, these results:

Matrices 512x512 of Complex (double, double)
GJ; 24sec
Strassen: 19sec
20% time less

Matrices 1024x1024 of Complex (double, double)
GJ; 3min 02sec
Strassen: 2min 05sec
38% time less!

Used Invertion Methods

Gauss Jordan LU decomposition


public static Complex[,] GaussJordanInvertion(Complex[,] mat, BackgroundWorker bw = null)
{
    //Gestione notifica progressione            
    int iPerc = 0;
    int iMax = mat.GetLength(0);
    int iDone = 0;

    Complex[,] res;
    res = MatrixGeneric.Create<complex>(iMax);
    res = (Complex[,])mat.Clone();

    int[,] indxc;
    indxc = MatrixGeneric.Create<int>(iMax, true); 
    int[,] indxr;
    indxr = MatrixGeneric.Create<int>(iMax, true);
    int[,] ipiv;
    ipiv = MatrixGeneric.Create<int>(iMax, true);
    Complex big;
    Complex pivinv;
    Complex dum;
    int iRow = 1;
    int iCol = 1;

    for (int i = 1; i <= iMax; i++)
        ipiv[i, 1] = 0;
    for (int i = 1; i <= iMax; i++)
    {
        big = 0;
        for (int j = 1; j <= iMax; j++)
            if (ipiv[j, 1] != 1)
                for (int k = 1; k <= iMax; k++)
                {
                    if (ipiv[k, 1] == 0)
                    {
                        if (Complex.Abs(res[j, k]) >= Complex.Abs(big))
                        {
                            big = Complex.Abs(res[j, k]);
                            iRow = j;
                            iCol = k;
                        }
                    }
                    else if (ipiv[k, 1] > 1)
                    {
                        //Error "Singular Matrix -1"
                    }
                }
        ++(ipiv[iCol, 1]);
        if (iRow != iCol)
        {
            for (int l = 1; l <= iMax; l++)
                Swap(res[iRow, l], res[iCol, l]);
        }
        indxr[i, 1] = iRow;
        indxc[i, 1] = iCol;
        if (res[iCol, iCol] == 0)
        {
            //Error "Singular Matrix -2"
        }
        pivinv = 1 / res[iCol, iCol];
        res[iCol, iCol] = 1;
        for (int l = 1; l <= iMax; l++)
            res[iCol, l] = res[iCol, l] * pivinv;
        for (int ll = 1; ll <= iMax; ll++)
            if (ll != iCol)
            {
                dum = res[ll, iCol];
                res[ll, iCol] = 0;
                for (int l = 1; l <= iMax; l++)
                {
                    res[ll, l] = res[ll, l] - res[iCol, l] * dum;
                }
            }        
    }

    for (int l = iMax; l > 0; l--)
    {
        if (indxr[l, 1] != indxc[l, 1])
            for (int k = 1; k <= iMax; k++)
                Swap(res[k, indxr[l, 1]], res[k, indxc[l, 1]]);
    }

    return res;
}

Strassen

public static Complex[,] Inverse(Complex[,] a, int iThreshold = 128)
{
    int iOriginalSize = a.GetLength(0);
    int iFmtSize = GetSize(a.GetLength(0));
    Complex[,] A = a;

    int iSize = A.GetLength(0);
    int iHalfSize = iSize / 2;

    if (iSize <= iThreshold)    {
        return GaussJordanInvertion(a);
    }

    Complex[,] A11 = null;
    Complex[,] A12 = null;
    Complex[,] A21 = null;
    Complex[,] A22 = null;

    Complex[,] R1 = null;
    Complex[,] R2 = null;
    Complex[,] R3 = null;
    Complex[,] R4 = null;
    Complex[,] R5 = null;
    Complex[,] R6 = null;
    Complex[,] R7 = null;

    Complex[,] C11 = null;
    Complex[,] C12 = null;
    Complex[,] C21 = null;
    Complex[,] C22 = null;


    A11 = GetMatrixPortion(A, iHalfSize, 1, 1);
    R1 = Inverse(A11);

    A21 = GetMatrixPortion(A, iHalfSize, iHalfSize + 1, 1);
    R2 = Mul_Strassen(A21, R1, iThreshold);

    A12 = GetMatrixPortion(A, iHalfSize, 1, iHalfSize + 1);
    R3 = Mul_Strassen(R1, A12, iThreshold);

    R4 = Mul_Strassen(A21, R3, iThreshold);

    A22 = GetMatrixPortion(A, iHalfSize, iHalfSize + 1, iHalfSize + 1);
    R5 = Subtract(R4, A22);

    R6 = Inverse(R5);

    C12 = Mul_Strassen(R3, R6, iThreshold);
    C21 = Mul_Strassen(R6, R2, iThreshold);
    R7 = Mul_Strassen(R3, C21, iThreshold);

    C11 = Subtract(R1, R7);
    C22 = Multiply(R6, -1);

    Complex[,] c = MatrixGeneric.Create<complex>(iSize, iSize);
    for (int i = 1; i <= iHalfSize; i++)
    {
        for (int j = 1; j <= iHalfSize; j++)
        {
            c[i, j] = C11[i, j];
            c[i, j + iHalfSize] = C12[i, j];
            c[i + iHalfSize, j] = C21[i, j];
            c[i + iHalfSize, j + iHalfSize] = C22[i, j];
        }
    }

    A11 = null;
    A12 = null;
    A21 = null;
    A22 = null;

    R1 = null;
    R2 = null;
    R3 = null;
    R4 = null;
    R5 = null;
    R6 = null;
    R7 = null;

    C11 = null;
    C12 = null;
    C21 = null;
    C22 = null;

    GC.Collect();

    return c;
}


Remember that the input matrix shoud be a 2^n multiple, if not! complete it with 0 and 1 on the diagonal.

Friday, May 2, 2014

Improve Matrices Operations Speed (Part 1)


In this Article I will try to show how to improve the calculation performance of typical matrix operations, eg. Multiplication or Inversion. At a beginning, I will introduce some information taken from Wikipedia but easily reached from any Algebric book that explains why this subject is quite important.

In the example proposed, I decided to use only squared matrices and starting from index 1 and not 0, so I used a method for create these (see MatrixGeneric.Create()). I used to work also with a Complex data type, but all discussions are valid also for more simple types like double or integer.

System of linear equations

From Wikipedia: In mathematics, a system of linear equations (or linear system) is a collection of linear equations involving the same set of variables. For example,


In mathematics, the theory of linear systems is the basis and a fundamental part of linear algebra, a subject which is used in most parts of modern mathematics. 

Computational algorithms for finding the solutions are an important part of numerical linear algebra, and play a prominent role in engineering, physics, chemistry, computer science, and economics. 

A system of non-linear equations can often be approximated by a linear system (see linearization), a helpful technique when making a mathematical model or computer simulation of a relatively complex system.

Matrix Equation

The vector equation is equivalent to a matrix equation of the form


where A is an m×n matrix, x is a column vector with n entries, and b is a column vector with m entries.


The number of vectors in a basis for the span is now expressed as the rank of the matrix.

How to solve: Methods

While systems of three or four equations can be readily solved by hand, computers are often used for larger systems. The standard algorithm for solving a system of linear equations is based on Gaussian elimination with some modifications. 

Firstly, it is essential to avoid division by small numbers, which may lead to inaccurate results. This can be done by reordering the equations if necessary, a process known as pivoting. Secondly, the algorithm does not exactly do Gaussian elimination, but it computes the LU decomposition of the matrix A. This is mostly an organizational tool, but it is much quicker if one has to solve several systems with the same matrix A but different vectors b.

Algorithms

The algorithms are easily founded, for example if you use to program with C or C++ a valid book is (Numerical Recipes in C written by William H., Flannery, Brian P., Teukolsky, Saul A). In general, the most critical  operations used are matrices multiplication or inversion so I will concentrate my attention in this way. 

I have created a Class dedicated to wrap all these methods:

public static class MatrixComplex

Multiplication

The multiplication is quite easy, it is a simple three nested loops, like these:

public static Complex[,] Multiply(Complex[,] a, Complex[,] b)
{
    int  iSize = a.GetLength(0);
    Complex[,] res = MatrixGeneric.Create<complex>( iSize, iSize);
    for (int row = 1; row <= iSize; row++)
    {
        for (int col = 1; col <= iSize; col++)
        {
            for (int i = 1; i <= iSize; i++)
            {
                res[row, col] += a[row, i] * b[i, col];
            }
        }
    }
    return res;
}

The problem with this kind of operation is that it has n^3 complexity, that means that with a large matrix it can take a lot of time, too much! For this reason there will be some possible options: the two that I considered for comparisons are Strassen Method and Parallel Computation.

Strassen Method 

In the mathematical discipline of linear algebra, the Strassen algorithm, named after Volker Strassen, is an algorithm used for matrix multiplication. It is faster than the standard matrix multiplication algorithm and is useful in practice for large matrices, but would be slower than the fastest known algorithm for extremely large matrices. (Wikipedia)

The complexity reached from Strassen Method is 2^2.8074 that means a lot of time less than standard method.

By the way this method is quite expensive in terms of memory, it uses many temporary smaller matrices and the input matrices must be 2^n squared matrix. So, if your input matrices are sized differently these must have their dimensions expanded to the next power of 2, which results in storing up to four times as many elements, and the seven auxiliary matrices each contain a quarter of the elements in the expanded ones.

In C# I suggest to compile in x64 (64bit), or looking for some trick in order to use the max memory size in x86 (32bit). (.NET Out Of Memory Exception - Used 1.3GB but have 16GB installed).

Strassen algorithm is a recursive method. It can be customized, a little, so you can try different variations to check the performances.

This is a C# version:

public static Complex[,] Strassen(Complex[,] a, Complex[,] b, int iThreshold = 128)
{
    int iSize = a.GetLength(0);
    Complex[,] c = MatrixGeneric.Create<complex>(iSize);
    int iNewSize = iSize / 2;

    if (iSize <= iThreshold) 
    {
        c = Multiply(a, b);
        return c;
    }

    Complex[,] A11 = MatrixGeneric.Create<complex>(iNewSize);
    Complex[,] A12 = MatrixGeneric.Create<complex>(iNewSize);
    Complex[,] A21 = MatrixGeneric.Create<complex>(iNewSize);
    Complex[,] A22 = MatrixGeneric.Create<complex>(iNewSize);

    Complex[,] B11 = MatrixGeneric.Create<complex>(iNewSize);
    Complex[,] B12 = MatrixGeneric.Create<complex>(iNewSize);
    Complex[,] B21 = MatrixGeneric.Create<complex>(iNewSize);
    Complex[,] B22 = MatrixGeneric.Create<complex>(iNewSize);

    Complex[,] C11 = MatrixGeneric.Create<complex>(iNewSize);
    Complex[,] C12 = MatrixGeneric.Create<complex>(iNewSize);
    Complex[,] C21 = MatrixGeneric.Create<complex>(iNewSize);
    Complex[,] C22 = MatrixGeneric.Create<complex>(iNewSize);

    Complex[,] M1 = null;
    Complex[,] M2 = null;
    Complex[,] M3 = null;
    Complex[,] M4 = null;
    Complex[,] M5 = null;
    Complex[,] M6 = null;
    Complex[,] M7 = null;
    Complex[,] AResult = null;
    Complex[,] BResult = null;
                      
    for (int i = 1; i <= iNewSize; i++)
    {
        for (int j = 1; j <= iNewSize; j++)
        {
            A11[i, j] = a[i, j];
            A12[i, j] = a[i, j + iNewSize];
            A21[i, j] = a[i + iNewSize, j];
            A22[i, j] = a[i + iNewSize, j + iNewSize];

            B11[i, j] = b[i, j];
            B12[i, j] = b[i, j + iNewSize];
            B21[i, j] = b[i + iNewSize, j];
            B22[i, j] = b[i + iNewSize, j + iNewSize];
        }
    }

    AResult = Sum(A11, A22);
    BResult = Sum(B11, B22);
    M1 = Strassen(AResult, BResult); 

    AResult = Sum(A21, A22);              
    M2 = Strassen(AResult, B11);       

    BResult = Subtract(B12, B22);              
    M3 = Strassen(A11, BResult);       

    BResult = Subtract(B21, B11);           
    M4 = Strassen(A22, BResult);       

    AResult = Sum(A11, A12);           
    M5 = Strassen(AResult, B22);       

    AResult = Subtract(A21, A11);
    BResult = Sum(B11, B12);             
    M6 = Strassen(AResult, BResult);    

    AResult = Subtract(A12, A22);
    BResult = Sum(B21, B22);             
    M7 = Strassen(AResult, BResult);     

    AResult = Sum(M1, M4);
    BResult = Subtract(M7, M5);
    C11 = Sum(AResult, BResult);

    C12 = Sum(M3, M5);

    C21 = Sum(M2, M4);

    //C22 = M1 + M3 - M2 + M6;
    AResult = Sum(M1, M3);
    BResult = Subtract(M6, M2);
    C22 = Sum(AResult, BResult);

    for (int i = 1; i <= iNewSize; i++)
    {
        for (int j = 1; j <= iNewSize; j++)
        {
            c[i, j] = C11[i, j];
            c[i, j + iNewSize] = C12[i, j];
            c[i + iNewSize, j] = C21[i, j];
            c[i + iNewSize, j + iNewSize] = C22[i, j];
        }
    }

    A11 = null;
    A12 = null;
    A21 = null;
    A22 = null;

    B11 = null;
    B12 = null;
    B21 = null;
    B22 = null;

    C11 = null;
    C12 = null;
    C21 = null;
    C22 = null;

    M1 = null;
    M2 = null;
    M3 = null;
    M4 = null;
    M5 = null;
    M6 = null;
    M7 = null;

    AResult = null;
    BResult = null;

    GC.Collect();

    return c;
}

At the end of calculation I force GC.Collect() in order to optimize the memory usage. Take care that this method can’t be called directly. Take care that the input matrices must be 2^n size.

In alternative use the below method. It resize the matrices before the multiplication.

public static Complex[,] Mul_Strassen(Complex[,] a, Complex[,] b, int iThreshold = 128)
{
    int iPrevSize = a.GetLength(0);
    int iNewSize = GetSize(a.GetLength(0));

    a = PrepareMatrix(a, iNewSize);
    b = PrepareMatrix(b, iNewSize);

    Complex[,] c = Strassen(a, b, iThreshold);

    c = GetMatrixPortion(c, iPrevSize);

    return c;
}

Standard Parallel Method

In my post C# (Multithreading) - A Wrapper Class, I exposed how and when to implement a parallel calculation. In multiplication calculus it is quite easy to do that, so using the MPBox class I modify the standard method.

Here the code:

private static Complex[,] A = MatrixGeneric.Create<Complex>(0);
private static Complex[,] B = MatrixGeneric.Create<Complex>(0);

public static Complex[,] Mul_Parallel(Complex[,] a, Complex[,] b)
{
    Utility.TraceLog("Begin Parallel Mul()");
    A = null;
    B = null;
    A = (Complex[,])a.Clone();
    B = (Complex[,])b.Clone();

    int iSize = A.GetLength(0);
    if (a.GetLength(0) != a.GetLength(1))
        throw new Exception("Not compatibles Matrices!");

    C = MatrixGeneric.Create<complex>(iSize);
    MyParamsItem[] aLoops = null;
    MPDefaultProc myp = new MPDefaultProc(MP_Mul);

    moMPBox.SetCicles(ref aLoops, 1, A.GetLength(0));

    moMPBox.Clear();
    for (int j = 0; j < aLoops.Length; j++)
    {
        moMPBox.Add(myp, aLoops[j]);
    }
    moMPBox.Start(); 
    Utility.TraceLog("End Parallel Mul()");

    A = null;
    B = null;
    return C;
}

public static void MP_Mul(MyParamsItem aoParam)
{
    int iSize = A.GetLength(0);
    for (int row = aoParam.min; row <= aoParam.max; row++) 
    {
        for (int col = 1; col <= iSize; col++) 
        {
            for (int i = 1; i <= iSize; i++) 
            {
                C[row, col] += A[row, i] * B[i, col]; 
            }
        }
    }        
}
Now you can easily compare the performances of the three methods exposed. You will see that the parallel method is the best one. This is because it reach better results compared with Strassen, but it doesn’t use so much memory .