Showing posts with label MPI. Show all posts
Showing posts with label MPI. Show all posts

Sunday, 18 June 2017

Pitfalls in pseudo-random number sampling at scale with Apache Spark

With kind contributions from Gregory Piatetsky-Shapiro, KDnuggets

In many data science applications and in academic research, techniques involving Bayesian Inference is now used commonly. One of the basic operation in Bayesian Inference techniques is drawing instances from given statistical distribution. This of course well known pseudo-random number sampling. Most commonly used methods first generates uniform random number and mapping that into distribution of interest via cumulative function (CDF) of it, i.e., Box-Mueller transform.

Large scale simulation are now possible, due to highly stable computational frameworks that can scale well. One of the unique framework is Apache Spark due to its distributed data structure supporting fault tolerance, called Resilient Distributed Data (RDD). Here is a simple way to generate one million Gaussian Random numbers and generating an RDD:

1
2
3
4
5
6
// Generate 1 million Gaussian random numbers
import util.Random
Random.setSeed(4242)
val ngauss = (1 to 1e6.toInt).map(x => Random.nextGaussian)
val ngauss_rdd = sc.parallelize(ngauss)
ngauss_rdd.count // 1 million

One unrealistic part of the above code example is that you may want to generate huge number of samples that won't fit in single memory, ngauss variable above.  Luckily, there are set of library functions one can use to generate random data as an RDD from mllib, see randomRDD. But for the remainder of this post, we will use our home made random RDD.
Figure: Scaling of execution time with increasing size,
with or without re-partitioning.

Concept of Partitions

As RDDs are distributed data structures, the concept of partition comes into play (link)..  So, you need to be careful of the size of partitions in RDDs. Recently I posed a question about this in Apache Spark mailing list (link)(gist). If you reduce the data size, take good care that your partition size reflects this, so to speak avoiding huge performance reduction. Unfortunately, Spark does not provide an automated  out of box solution optimising partition size. The actual data items that might reduce during your analysis pipeline. A reduced RDD will inherit partition size of its parent and this may be a limiting issue.

As you might have already guessed, RDDs are great tool in doing large scale analysis but they won't provide you a free lunch. Let's do a small experiment.

Hands on Experiment

Going back to our original problem of using Spark in Bayesian inference algorithms, it is common to operate on samples via certain procedure. And those procedures, let's say an operator, highly likely that it will reduce the number of elements in the sample. One example would be applying a cut-off or a constrained in the CDF, which essential the definition of it, probability of random variable $x > x_{0}$.  As seen in Figure, we have generated random RDDs up to 10 million numbers and measure the wall-clock time of `count` operation,  which occurs after a filter operation that reduces the number of items considerably. See codes in the Appendix. As a result, in Figure, we have identified 3 different regions, depending on data size, 

  1.  Small Data: Re-partitioning does not play a role in the performance.
  2.  Medium Size: Re-partitioning gives up to order of magnitude better performance.
  3.  Big Data: Re-partitioning gives a constant performance improvement, up to 3 times better, and  the improvement is drifting, meaning it will be more significant larger the data size. 
Conclusion

Spark provides a superb API to develop high quality Data Science solutions. However, programming with Spark and designing algorithms requires optimisation of different aspects of the RDD workflow. Here, we only demonstrate only dramatic effect of re-partitioning after a simple operation in the walk clock time. Hence, it is advised to have a benchmark identifying under which circumstances your data pipeline produce different wall clock behaviour before going into production.

Appendix

Entire code base can be cloned from github  (here).

Spark Benchmark

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
     Generating Gaussian numbers : Performance with repartition
     RDD partition size retains from the parent

     (c) 2017
     GPLv3

     Author: Mehmet Suzen (suzen at acm dot org)

     Run this script on spark-shell
     spark-shell --executor-cores 4
     spark-shell> :load gaussian_random_filter.scala

 */

import util.Random     // normal
import breeze.linalg._ // Linear Algebra objects and csvwrite
import java.io.File    // File io

/*

   Generate gaussian random numbers manually without mllib
   Benchmark repartition

 */
// Random numbers to generate
val Ns = Array(1e3, 1e4, 5e4, 1e5, 5e5, 1e6, 2e6, 4e6, 8e6, 1e7)
val benchMat = DenseMatrix.zeros[Double](10,3)
Random.setSeed(4242)
for(i <- 0 to Ns.size-1) {
   println("running for " + Ns(i))
   // Generate random RDD size Ns
   var ngauss     = (1 to Ns(i).toInt).map(x=>Random.nextGaussian)
   var ngauss_rdd = sc.parallelize(ngauss)
   var ngauss_rdd2 = ngauss_rdd.filter(x=>x > 4.0)
   // An operation without repartition
   var t0 = System.nanoTime()
   var c1 = ngauss_rdd2.count
   var t1 = System.nanoTime()
   var e1 = (t1 - t0)/1e9 // seconds
   // An operation with repartition
   var ngauss_rdd3 = ngauss_rdd2.repartition(1)
   t0 = System.nanoTime()
   var c2 = ngauss_rdd3.count
   t1 = System.nanoTime()
   var e2 = (t1 - t0)/1e9
   benchMat(i,::) := DenseVector[Double](Ns(i), e1, e2).t
}

/* Record the benchmark results */
csvwrite(new File("bench.csv"), benchMat)

Plotting code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 
#  Plot the benchmark from Spark Gaussian Random numbers
# 
#  (c) 2017
#  GPLv3
#
library(grid)
library(gridExtra)
library(ggplot2)
library(reshape)
bench_df           <- read.csv2("bench.csv", header=FALSE, sep=",")
colnames(bench_df) <- c("N", "no", "yes")
bench_df2          <- reshape2:::melt(bench_df,
                                      measure.vars=c("no","yes"))
colnames(bench_df2) <- c("N", "repartion", "time")
bench_df2$N    <-  as.numeric(as.vector(bench_df2$N))
bench_df2$time <-  as.numeric(as.vector(bench_df2$time))
gt <-  theme(
             panel.background = element_blank(), 
             axis.text.x      = element_text(face="bold", color="#000000", size=11),
             axis.text.y      = element_text(face="bold", color="#000000", size=11),
             axis.title.x     = element_text(face="bold", color="#000000", size=11),
             axis.title.y     = element_text(face="bold", color="#000000", size=11)
            )  
p1                 <- ggplot(bench_df2,
    aes(x=N, y=time, colour=repartion)) + 
               geom_smooth(formula="y~x", span=0.3) + xlab("Number of random draws") + ylab("Wall Clock (Seconds)") +
                             ggtitle("Effect of repartition in count: Gaussian Random Numbers") + 
                             gt
grid.newpage()
footnote <- "(c) 2017, Mehmet Suzen : http://memosisland.blogspot.de/"
g <- arrangeGrob(p1, bottom = textGrob(footnote, x = 0, hjust = -0.1, vjust=0.1, gp = gpar(fontface = "italic", fontsize = 12)))

png(file="spark_repartition_random.png")
grid.draw(g)
dev.off()

Thursday, 24 July 2014

Distributed processing with BASH via LSF' blaunch

LSF is currently a de-facto standard in managing computing queues. Many inexpensive CPU or GPU clusters uses LSF. Majority of current academic HPC codes are using MPI for their core computations. While the parallelisation/distribution of the computational tasks are achived by the core MPI codes, LSF batch job script, usually can invoke via shell, is still a serial code. See here. This may bring a bottleneck in case that the batch script needs to run large set of commands in sequence, that can be done in distributed way.  

Idle CPUs

An example task could be that your parallel code generates large set of output files and you require to further process them, such as compressing all data files produced by processes. Once you launch $n$ processes, after completion of the MPI code, running compression over the $n$ data files will be processed in a single process. This will not only waste $n-1$ cpu time waiting idle, it will waste our wall clock time $n-1$ fold while compression is performed sequentially. To overcome this limitation LSF has introduced a tool called blaunch, for distributing the tasks. The documentation provided by IBM is really fantastic. However it lacks a simple example for novice users. In this post I provide a simple example that compresses $n$ data files using $n$ distributed processes. 

Job distribution via blaunch

The first step in doing this task, we need to figure our what hostnames we are currently running with the LSF job script. This can be obtained using $LSF_HOSTS environment variable. The second step is to write this information into hosts file that can be read by blaunch. A compression script should be provided in a seperate file. This is an important point. Because whole concept of blaunch is that it distributes the given command to all available processes. Note that, LSF process is running your script not the distributed host processes, at least what it is transparently. Knowing this fact, we can only differentiate the process ID outside of our job submit script using $LSF_PM_TASKID.

Here is a possible portion of the job script, using BASH,
1
2
3
4
5
cd /mydata
rm hosts.txt
export myhosts=($LSB_HOSTS)
for hostName in ${myhosts[*]};do echo $hostName >> hosts.txt ; done
blaunch -u hosts.txt compress.sh

As you can see we don't use  $LSF_PM_TASKID in the job script, rather in the compression script that is going to be launch in one of the distributed process:


1
2
3
4
5
export trajF=(`find . -name '*.dat'`)
# ${trajF[ii]}; ii=`expr $ii + 1`;done # This line should be removed (see comments)
export ii=`expr $LSF_PM_TASKID - 1`
echo ${trajF[$ii]}
xz -4e --format=lzma ${trajF[$ii]}

We assume that commands are available in the BASH environment. Note that we use bash array to identify file names and your output convention should match with the identification.

 Conclusion

We have provided a simple working example of launching parallel shell script tasks using blaunch. A similar approach is presented here. An alternating approach to this could be using GNU Parallel , which is discussed here.


(c) Copyright 2008-2024 Mehmet Suzen (suzen at acm dot org)

Creative Commons License
This work is licensed under a Creative Commons Attribution 4.0 International License