Showing posts with label monte carlo. Show all posts
Showing posts with label monte carlo. Show all posts

Friday, 9 February 2024

Exact reproducibility of stochastic simulations for parallel and serial algorithms simultaneously
Random Stream Chunking

Preamble 

Figure: Visual description of
random stream chunking, M.Suzen (2017)
The advent of using computational sciences approaches, i.e., data science and machine learning, in the industry becomes more common practice in almost all organisations due to the democratisation of data science tools and availability of inexpensive cloud infrastructure. This brings the requirement or even compulsory practice of a code being so called parallelised. Note that parallelisation is used as an umbrella term here for using multiple compute resources in accelerating otherwise a serial computation and could mean a distributed or multi-core computing, i.e., multiple CPUs/GPUs. Here we provide a simple yet a very powerful approach to preserve reproducibility of parallel and serial implementation of the same algorithm that uses random numbers, i.e., stochastic simulations. We assume Single Instruction Multiple Data (SIMD) setting. 

Terminology on repeatability, reproducibility and  replicability 

Even though we only use reproducibility as a term as an umbrella term, there are clear distinctions, recommended by ACM, here. We summarise this from computational science perspective :

repeatability : Owner re-run the code to produce the same results on own environment. 
reproducibility: Others can  re-run the code to produce the same results on other's environment. 
replicability: Others can re-code the same thing differently and produce the same results on other's environment. 

In the context of this post, since parallel and serial settings would constitute different environments, the practice of getting the same results, this falls under reproducibility.   

Single Instruction Multiple Data (SIMD) setting. 

This is the most used, and probably the only one you would ever need, approach in parallel processing. It implies using the same instruction, i.e., algorithm or function/module, for the distinct portions of the data. This originates from applied mathematics techniques so called domain decomposition method

Simultaneous Random Stream Chunking 

The approach in ensuring exact reproducibility of a stochastic algorithm in both serial and parallel implementation lies in default chunking in producing the random numbers both in serial and parallel code. This approach used in the Bristol Python package Here is the mathematical definition. 

Defintion Random Stream Chunking Given a random number generator $\mathscr{R}(s_{k})$ with as seed $s_{k}$ is used over $k$-partitions. These partitions are always corresponds to $k$ datasets, MD portion of SIMD. In the case of parallel algorithms, each $k$ corresponds to a different compute resource     $\mathscr{C}_{k}$. 

By this definition we ensure that both parallel and serial processing receiving exactly the same random number, this is summarised in the Figure.

Conclusion

We outline a simple yet effective way of ensuring exact reproducibility of serial and parallel simulations simultaneously. However, reproducibility of stochastic simulations are highly hardware dependent as well, such as GPUs and NPUs, and their internal streams might not be that easy to control partitions, but generic idea presented here should be applicable.


Please cite this article as: 

 @misc{suezen24rep, 
     title = {Exact reproducibility of stochastic simulations for parallel and serial algorithms simultaneously}, 
     howpublished = {\url{https://memosisland.blogspot.com/2024/02/exact-reproducibility-of-stochastic.html}, 
     author = {Mehmet Süzen},
     year = {2024}
}
  



Sunday, 18 June 2017

Pitfalls in pseudo-random number sampling at scale with Apache Spark

With kind contributions from Gregory Piatetsky-Shapiro, KDnuggets

In many data science applications and in academic research, techniques involving Bayesian Inference is now used commonly. One of the basic operation in Bayesian Inference techniques is drawing instances from given statistical distribution. This of course well known pseudo-random number sampling. Most commonly used methods first generates uniform random number and mapping that into distribution of interest via cumulative function (CDF) of it, i.e., Box-Mueller transform.

Large scale simulation are now possible, due to highly stable computational frameworks that can scale well. One of the unique framework is Apache Spark due to its distributed data structure supporting fault tolerance, called Resilient Distributed Data (RDD). Here is a simple way to generate one million Gaussian Random numbers and generating an RDD:

1
2
3
4
5
6
// Generate 1 million Gaussian random numbers
import util.Random
Random.setSeed(4242)
val ngauss = (1 to 1e6.toInt).map(x => Random.nextGaussian)
val ngauss_rdd = sc.parallelize(ngauss)
ngauss_rdd.count // 1 million

One unrealistic part of the above code example is that you may want to generate huge number of samples that won't fit in single memory, ngauss variable above.  Luckily, there are set of library functions one can use to generate random data as an RDD from mllib, see randomRDD. But for the remainder of this post, we will use our home made random RDD.
Figure: Scaling of execution time with increasing size,
with or without re-partitioning.

Concept of Partitions

As RDDs are distributed data structures, the concept of partition comes into play (link)..  So, you need to be careful of the size of partitions in RDDs. Recently I posed a question about this in Apache Spark mailing list (link)(gist). If you reduce the data size, take good care that your partition size reflects this, so to speak avoiding huge performance reduction. Unfortunately, Spark does not provide an automated  out of box solution optimising partition size. The actual data items that might reduce during your analysis pipeline. A reduced RDD will inherit partition size of its parent and this may be a limiting issue.

As you might have already guessed, RDDs are great tool in doing large scale analysis but they won't provide you a free lunch. Let's do a small experiment.

Hands on Experiment

Going back to our original problem of using Spark in Bayesian inference algorithms, it is common to operate on samples via certain procedure. And those procedures, let's say an operator, highly likely that it will reduce the number of elements in the sample. One example would be applying a cut-off or a constrained in the CDF, which essential the definition of it, probability of random variable $x > x_{0}$.  As seen in Figure, we have generated random RDDs up to 10 million numbers and measure the wall-clock time of `count` operation,  which occurs after a filter operation that reduces the number of items considerably. See codes in the Appendix. As a result, in Figure, we have identified 3 different regions, depending on data size, 

  1.  Small Data: Re-partitioning does not play a role in the performance.
  2.  Medium Size: Re-partitioning gives up to order of magnitude better performance.
  3.  Big Data: Re-partitioning gives a constant performance improvement, up to 3 times better, and  the improvement is drifting, meaning it will be more significant larger the data size. 
Conclusion

Spark provides a superb API to develop high quality Data Science solutions. However, programming with Spark and designing algorithms requires optimisation of different aspects of the RDD workflow. Here, we only demonstrate only dramatic effect of re-partitioning after a simple operation in the walk clock time. Hence, it is advised to have a benchmark identifying under which circumstances your data pipeline produce different wall clock behaviour before going into production.

Appendix

Entire code base can be cloned from github  (here).

Spark Benchmark

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
     Generating Gaussian numbers : Performance with repartition
     RDD partition size retains from the parent

     (c) 2017
     GPLv3

     Author: Mehmet Suzen (suzen at acm dot org)

     Run this script on spark-shell
     spark-shell --executor-cores 4
     spark-shell> :load gaussian_random_filter.scala

 */

import util.Random     // normal
import breeze.linalg._ // Linear Algebra objects and csvwrite
import java.io.File    // File io

/*

   Generate gaussian random numbers manually without mllib
   Benchmark repartition

 */
// Random numbers to generate
val Ns = Array(1e3, 1e4, 5e4, 1e5, 5e5, 1e6, 2e6, 4e6, 8e6, 1e7)
val benchMat = DenseMatrix.zeros[Double](10,3)
Random.setSeed(4242)
for(i <- 0 to Ns.size-1) {
   println("running for " + Ns(i))
   // Generate random RDD size Ns
   var ngauss     = (1 to Ns(i).toInt).map(x=>Random.nextGaussian)
   var ngauss_rdd = sc.parallelize(ngauss)
   var ngauss_rdd2 = ngauss_rdd.filter(x=>x > 4.0)
   // An operation without repartition
   var t0 = System.nanoTime()
   var c1 = ngauss_rdd2.count
   var t1 = System.nanoTime()
   var e1 = (t1 - t0)/1e9 // seconds
   // An operation with repartition
   var ngauss_rdd3 = ngauss_rdd2.repartition(1)
   t0 = System.nanoTime()
   var c2 = ngauss_rdd3.count
   t1 = System.nanoTime()
   var e2 = (t1 - t0)/1e9
   benchMat(i,::) := DenseVector[Double](Ns(i), e1, e2).t
}

/* Record the benchmark results */
csvwrite(new File("bench.csv"), benchMat)

Plotting code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 
#  Plot the benchmark from Spark Gaussian Random numbers
# 
#  (c) 2017
#  GPLv3
#
library(grid)
library(gridExtra)
library(ggplot2)
library(reshape)
bench_df           <- read.csv2("bench.csv", header=FALSE, sep=",")
colnames(bench_df) <- c("N", "no", "yes")
bench_df2          <- reshape2:::melt(bench_df,
                                      measure.vars=c("no","yes"))
colnames(bench_df2) <- c("N", "repartion", "time")
bench_df2$N    <-  as.numeric(as.vector(bench_df2$N))
bench_df2$time <-  as.numeric(as.vector(bench_df2$time))
gt <-  theme(
             panel.background = element_blank(), 
             axis.text.x      = element_text(face="bold", color="#000000", size=11),
             axis.text.y      = element_text(face="bold", color="#000000", size=11),
             axis.title.x     = element_text(face="bold", color="#000000", size=11),
             axis.title.y     = element_text(face="bold", color="#000000", size=11)
            )  
p1                 <- ggplot(bench_df2,
    aes(x=N, y=time, colour=repartion)) + 
               geom_smooth(formula="y~x", span=0.3) + xlab("Number of random draws") + ylab("Wall Clock (Seconds)") +
                             ggtitle("Effect of repartition in count: Gaussian Random Numbers") + 
                             gt
grid.newpage()
footnote <- "(c) 2017, Mehmet Suzen : http://memosisland.blogspot.de/"
g <- arrangeGrob(p1, bottom = textGrob(footnote, x = 0, hjust = -0.1, vjust=0.1, gp = gpar(fontface = "italic", fontsize = 12)))

png(file="spark_repartition_random.png")
grid.draw(g)
dev.off()

Friday, 7 December 2012

UEFA Champions League Knockout Phase Draws: Monte Carlo Simulation with R

Draws for the knockout phase of the 2012–13 UEFA Champions League will be held in Nyon on the 20th December 2012. The rules of the draw are simple and are as follows:
  • 8 Group winner teams will be seeded.
  • 8 Group runner-up teams will be unseeded.
  • Teams coming from the same group and from same association can not be drawn against.
One can compute the probabilities of the likely outcomes by using ensemble average, a standard way of finding the frequencies where we identify whole sample space. Here we take a different approach and use time averages. What we do is to run large number of draws (20 million times) using randomly generated draws. I have called this exercise a Monte Carlo simulation, rather a simple sort while there is no fancy sampling of any sort, only rejection of pair up or the given draw if the last remaining pair does not satisfy the rules. A statistical language GNU R is used to generate the likely outcome. We use Mersenne Twister RNG, a default random number generator which is known to have very large cycle.   In result we basically count the likely pair-ups in a matrix (matrix of pairs) and find the frequency of each likely pair up. Simulation takes 3-4 hours on a single CPU Intel Xeon with 16 GB RAM running Ubuntu GNU/Linux. Well, to be honest I haven't optimized the code.

Qualified teams, their groups and associations definitions, the simulation and results reporting R code are as follows (note that we use xtable for html outputs, for other functions we use, see below codes after tables):


winnerTeams               <- c('Paris-Saint-Germain', 'Schalke-04', 'Malaga', 'Borussia-Dortmund', 'Juventus', 'Bayern-Munich', 'Barcelona', 'Manchester-United');
winnerAssociation         <- c('FR', 'DE', 'ES', 'DE', 'IT', 'DE', 'ES', 'ENG');
runnersUpTeams            <- c('Porto', 'Arsenal', 'Milan', 'Real-Madrid', 'Shakhtar-Donetsk', 'Valencia', 'Celtic', 'Galatasaray');
runnersUpTeamsAssociation <- c('PT', 'ENG', 'IT', 'ES', 'UA', 'ES', 'SCO', 'TR');
countMatrix <- matrix(0, 8, 8)
row.names(countMatrix) <- winnerTeams;
colnames(countMatrix) <- runnersUpTeams;
many <- 20e6;
system.time(drawMany(winnerTeams, winnerAssociation, runnersUpTeams, runnersUpTeamsAssociation, countMatrix, many))
countMatrix <- countMatrix/many;
print(countMatrix);

Simulations results can be interpreted based on frequencies (probabilities) of pairings, rather intuitively while probabilities are not that far off . For example if we consider Barcelona, Milan and Arsenal score the largest 0.23 and 0.21.  So my guess based on these frequencies, which I select maximums first ,then the second maximum and so on. If all ties I select the highest count using the second table.  Here are the predicted pairs, (ordered with highest probability):

Barcelona - Milan
Malaga - Arsenal
Bayern Munich - Real Madrid
Borissia Dordmund - Valencia
Manchester United - Celtic
Juventus - Galatasaray
Schalke 04 - Porto
PSG - Donetsk

Note that predicted pairs are quite depending on your selection strategy.
Table for the frequencies:


Porto Arsenal Milan Real-Madrid Shakhtar-Donetsk Valencia Celtic Galatasaray
Paris-Saint-Germain 0.00 0.13 0.14 0.18 0.12 0.18 0.12 0.12
Schalke-04 0.12 0.00 0.15 0.19 0.12 0.19 0.13 0.12
Malaga 0.19 0.23 0.00 0.00 0.19 0.00 0.20 0.19
Borussia-Dortmund 0.13 0.14 0.15 0.00 0.13 0.19 0.13 0.13
Juventus 0.13 0.15 0.00 0.22 0.00 0.22 0.14 0.13
Bayern-Munich 0.13 0.14 0.15 0.19 0.13 0.00 0.13 0.13
Barcelona 0.18 0.21 0.23 0.00 0.19 0.00 0.00 0.18
Manchester-United 0.13 0.00 0.16 0.22 0.13 0.22 0.14 0.00

Table for the counts (actually all are integers):

Porto Arsenal Milan Real-Madrid Shakhtar-Donetsk Valencia Celtic Galatasaray
Paris-Saint-Germain 0.00 2589164.00 2897024.00 3658337.00 2356581.00 3658892.00 2494247.00 2345755.00
Schalke-04 2348458.00 0.00 2924099.00 3735314.00 2371245.00 3743246.00 2517610.00 2360028.00
Malaga 3781019.00 4506559.00 0.00 0.00 3845872.00 0.00 3997679.00 3868871.00
Borussia-Dortmund 2500221.00 2819591.00 3098797.00 0.00 2542013.00 3863888.00 2646626.00 2528864.00
Juventus 2659773.00 2983255.00 0.00 4393444.00 0.00 4392158.00 2892103.00 2679267.00
Bayern-Munich 2500835.00 2818331.00 3098181.00 3866890.00 2542742.00 0.00 2647340.00 2525681.00
Barcelona 3620075.00 4283100.00 4684023.00 0.00 3721268.00 0.00 0.00 3691534.00
Manchester-United 2589619.00 0.00 3297876.00 4346015.00 2620279.00 4341816.00 2804395.00 0.00

This is the main simulation function:


drawMany <- function(winnerTeams, winnerAssociation, runnersUpTeams, runnersUpTeamsAssociation, countMatrix, many) {
 for(i in 1:many) {
  repeat {
     dr          <- drawOneKnock(winnerTeams, winnerAssociation, runnersUpTeams, runnersUpTeamsAssociation,0);
     if(sum(dr) > 0) break;
   }
  updateCount <- mapply(incMatrix, dr[,1], dr[,2])
 }
}

A single draw can be generated as follows:


drawOneKnock <- function(winnerTeams, winnerAssociation, runnersUpTeams, runnersUpTeamsAssociation, names=1) {
    k=1
  repeat {
    k=k+1;
    if(k > 1000) return(-1);
    blockWin = 1:8     ; # tracking for draw
    blockRun = blockWin;
    winners  = c(); # Draw results
    runners  = c();
    for(i in 1:7) {
         kk =1;
      repeat {
        kk=kk+1;
        if(kk > 1000) return(-1);
        winner <- sample(blockWin, 1);
        runner <- sample(blockRun, 1);
        if(!(runner == winner) && !(winnerAssociation[winner] == runnersUpTeamsAssociation[runner])) {
          break;
        }
      }
      blockWin <- blockWin[-which(blockWin == winner)];
      blockRun <- blockRun[-which(blockRun == runner)];
      winners  <- c(winners, winner);
      runners  <- c(runners, runner);
     }
      winner <- blockWin;
      runner <- blockRun;
      # check if last remaining is ok, otherwise invalidate draw
      if(!(runner == winner) && !(winnerAssociation[winner] == runnersUpTeamsAssociation[runner])) {
        winners  <- c(winners, blockWin);
        runners  <- c(runners, blockRun);
        if(names)  dr <- cbind(winnerTeams[winners], runnersUpTeams[runners]);
        if(!names) dr <- cbind(winners, runners);
        break;
      }
   }
  dr
}

And counting the pair-ups is performed by a simple function:


incMatrix <- function(i, j) {
   countMatrix[i,j] <<- countMatrix[i,j]+1;
   return(0);
 }
(c) Copyright 2008-2024 Mehmet Suzen (suzen at acm dot org)

Creative Commons License
This work is licensed under a Creative Commons Attribution 4.0 International License