Thursday, 24 July 2014

Distributed processing with BASH via LSF' blaunch

LSF is currently a de-facto standard in managing computing queues. Many inexpensive CPU or GPU clusters uses LSF. Majority of current academic HPC codes are using MPI for their core computations. While the parallelisation/distribution of the computational tasks are achived by the core MPI codes, LSF batch job script, usually can invoke via shell, is still a serial code. See here. This may bring a bottleneck in case that the batch script needs to run large set of commands in sequence, that can be done in distributed way.  

Idle CPUs

An example task could be that your parallel code generates large set of output files and you require to further process them, such as compressing all data files produced by processes. Once you launch $n$ processes, after completion of the MPI code, running compression over the $n$ data files will be processed in a single process. This will not only waste $n-1$ cpu time waiting idle, it will waste our wall clock time $n-1$ fold while compression is performed sequentially. To overcome this limitation LSF has introduced a tool called blaunch, for distributing the tasks. The documentation provided by IBM is really fantastic. However it lacks a simple example for novice users. In this post I provide a simple example that compresses $n$ data files using $n$ distributed processes. 

Job distribution via blaunch

The first step in doing this task, we need to figure our what hostnames we are currently running with the LSF job script. This can be obtained using $LSF_HOSTS environment variable. The second step is to write this information into hosts file that can be read by blaunch. A compression script should be provided in a seperate file. This is an important point. Because whole concept of blaunch is that it distributes the given command to all available processes. Note that, LSF process is running your script not the distributed host processes, at least what it is transparently. Knowing this fact, we can only differentiate the process ID outside of our job submit script using $LSF_PM_TASKID.

Here is a possible portion of the job script, using BASH,
1
2
3
4
5
cd /mydata
rm hosts.txt
export myhosts=($LSB_HOSTS)
for hostName in ${myhosts[*]};do echo $hostName >> hosts.txt ; done
blaunch -u hosts.txt compress.sh

As you can see we don't use  $LSF_PM_TASKID in the job script, rather in the compression script that is going to be launch in one of the distributed process:


1
2
3
4
5
export trajF=(`find . -name '*.dat'`)
# ${trajF[ii]}; ii=`expr $ii + 1`;done # This line should be removed (see comments)
export ii=`expr $LSF_PM_TASKID - 1`
echo ${trajF[$ii]}
xz -4e --format=lzma ${trajF[$ii]}

We assume that commands are available in the BASH environment. Note that we use bash array to identify file names and your output convention should match with the identification.

 Conclusion

We have provided a simple working example of launching parallel shell script tasks using blaunch. A similar approach is presented here. An alternating approach to this could be using GNU Parallel , which is discussed here.


Wednesday, 7 May 2014

Euclid Algorithm for Set of Integers: 'Reduce' vs. trees in R

The Euclid Algorithm provides a solution to the greatest common divisor (GCD) of two natural numbers $x_{1}$ and $x_{-2}$, denoted by $GCD(x_{1}, x_{2})$. This will produce the largest integer that divides $x_{1}$ and $x_{2}$. Solution is proposed by Euclid of Ancient Greece. This can be formulated as a recurrence relation:
$$x_{k} = x_{k-1}  modulo  x_{k-2}.$$
'modulo' binary operation returns the remainder from a given division. Stopping criterion for the recurrence relation is reached when $x_{k-2}=0$ and the result of GCD will be the current value of $x_{k-1}$. This process can be visualised as successive divisions. Let's implement this in R in a naive way.

# Naive Euclid algorithm by msuzen
gcd <- function(a, b)
{
  rk_1 <- a;
  rk_2 <- b;
  # Recurrence Formula:  r_k =  r_k-1 modulo r_k-2
  # Increment k until r_k-2 == 0 
  while(rk_2 != 0) {
    rk      <- rk_1%%rk_2; # remainder
    rk_1    <- rk_2;       # proceed in recurrence
    rk_2    <- rk;
  }
  return(rk_1)
}

This is a straight forward task. Let's make the problem little more generic. What happends if we would like to know GCD of $n$ natural numbers, $x_{1},..., x_{n}$? Than, a solution is to apply GCD operation pairwise, for example if $n=3$:
$$GCD(x_{1}, GCD(x_{2}, x_{3})) = GCD(GCD(x_{1}, x_{2}), x_{3})$$
How can we implement this for a vector of non-negative integers?

Tree Approach

The simplest way to reach GCD of $n$ numbers is probably thinking of this process as a binary tree, formed by pairing elements of set of integers as we obtain GCDs. It is relatively easy to implement this because ordering of pairs is not important. We can start from the beginning and pair up as we obtain the results. Here is the naive implementation.

1
2
3
4
5
6
7
8
gcdN <- function(X) {
  n      <- length(X)
  gcdEnd <- X[1]
  for(i in 2:n) {
     gcdEnd <- gcd(gcdEnd, X[i])
  } 
  return(gcdEnd)
}

'Reduce' operation 

So called tree approach we have given above is actually noting but a Reduce operation in the context of MapReduce. The function gcdN can be replaced with a single line.
Reduce("gcd", X)
This example looks trivial but having Reduce and friends in our programming toolbox makes our life a little easier. Noting so novel here! But it could saves us time not to implement tree like algorithm.