## Loading required package: knitr

require(ggplot2)
require(plyr)
require(grid) # contains the arrow function
require(biOps)
require(doMC) # for parallel code
require(EBImage)
## To install EBImage
# source("http://bioconductor.org/biocLite.R")
# biocLite("EBImage")

# start parallel environment
registerDoMC()
# functions for converting images back and forth
im.to.df<-function(in.img) {
    out.im<-expand.grid(x=1:nrow(in.img),y=1:ncol(in.img))
    out.im$val<-as.vector(in.img)
    out.im
}
df.to.im<-function(in.df,val.col="val",inv=F) {
  in.vals<-in.df[[val.col]]
  if(class(in.vals[1])=="logical") in.vals<-as.integer(in.vals*255)
  if(inv) in.vals<-255-in.vals
  out.mat<-matrix(in.vals,nrow=length(unique(in.df$x)),byrow=F)
  attr(out.mat,"type")<-"grey"
  out.mat
}
ddply.cutcols<-function(...,cols=1) {
  # run standard ddply command
  cur.table<-ddply(...)
  cutlabel.fixer<-function(oVal) {
    sapply(oVal,function(x) {
      cnv<-as.character(x)
      mean(as.numeric(strsplit(substr(cnv,2,nchar(cnv)-1),",")[[1]]))
    })
  }
  cutname.fixer<-function(c.str) {
    s.str<-strsplit(c.str,"(",fixed=T)[[1]]
    t.str<-strsplit(paste(s.str[c(2:length(s.str))],collapse="("),",")[[1]]
    paste(t.str[c(1:length(t.str)-1)],collapse=",")
  }
  for(i in c(1:cols)) {
    cur.table[,i]<-cutlabel.fixer(cur.table[,i])
    names(cur.table)[i]<-cutname.fixer(names(cur.table)[i])
  }
  cur.table
}

Quantitative Big Imaging

author: Kevin Mader date: 8 May 2014 width: 1440 height: 900 transition: rotate css: ../template.css

Scaling Up / Big Data

Course Outline

Literature / Useful References

Big Data


Cluster Computing

Databases

Cloud Computing

Outline


Motivation

There are three different types of problems that we will run into.

Really big data sets


Many datasets

Exploratory Studies

Example Projects

Zebra fish Full Animal Phenotyping

Full adult animal at cellular resolution 1000s of samples of full adult animals, imaged at 0.74 \(\mu m\) resolution: Images 11500 x 2800 x 628 \(\longrightarrow\) 20-40GVx / sample


Brain Project

Whole brain with cellular resolution 1 \(cm^3\) scanned at 1 \(\mu m\) resolution: Images \(\longrightarrow\) 1000 GVx / sample

What is wrong with usual approaches?

Normally when problems are approached they are solved for a single task as quickly as possible

im_in=imread('test.jpg');
im_filter=medfilt2(im_in,[5,5]);
cl_img=bwlabel(im_filter>10);
cl_count=hist(cl_img,1:100);
dlmwrite(cl_count,'out.txt')

You have to rewrite everything, everytime

If you start with a bad approach, it is very difficult to fix, big data and reproducibility must be considered from the beginning

Computer Science Principles

Disclosure : There are entire courses / PhD thesis's / Companies about this, so this is just a quick introduction

What is parallelism?

Parallelism is when you can divide a task into separate pieces. Some tasks are easy to parallelize while others are very difficult. Rather than focusing on programming, real-life examples are good indicators of difficultly.

  1. You have 10 friends who collectively know all the capital cities of the world.

  1. Each friend has some money with them

What is distributed computing?

Distributed computing is very similar to parallel computing, but a bit more particular. Parallel means you process many tasks at the same time, while distributed means you are no longer on the same CPU, process, or even on the same machine.

The distributed has some important implications since once you are no longer on the same machine the number of variables like network delay, file system issues, and other users becomes a major problem.

Parallel Challenges

Coordination

Parallel computing requires a significant of coordinating between computers for non-easily parallelizable tasks.

Mutability

The second major issue is mutability, if you have two cores / computers trying to write the same information at the same it is no longer deterministic (not good)

Blocking

The simple act of taking turns and waiting for every independent process to take its turn can completely negate the benefits of parallel computing

Distributed Challenges

Inherits all of the problems of parallel programming with a whole variety of new issues.

Sending Instructions / Data Afar

Fault Tolerance

If you have 1000 computers working on solving a problem and one fails, you do not want your whole job to crash

Data Storage

How can you access and process data from many different computers quickly without very expensive infrastructure

Imperative Programming

Directly coordinating tasks on a computer.


Making a soup

  1. Buy vegetables at market
  2. then Buy meat at butcher
  3. then Chop carrots into pieces
  4. then Chop potatos into pieces
  5. then Heat water
  6. then Wait until boiling then add chopped vegetables
  7. then Wait 5 minutes and add meat

Declarative


Making a soup

Comparison

They look fairly similar, so what is the difference? The second is needlessly complicated for one person, but what if you have a team, how can several people make an imparitive soup faster (chopping vegetables together?)

Imperative soup

  1. Buy {carrots, peas, tomatoes} at market
  2. then Buy meat at butcher
  3. then Chop carrots into pieces
  4. then Chop potatos into pieces
  5. then Heat water
  6. then Wait until boiling then add chopped vegetables
  7. then Wait 5 minutes and add meat

How can many people make a declarative soup faster? Give everyone a different task (not completely efficient since some tasks have to wait on others)

Declarative soup

Results

Imperative


Declarative

Lazy Evaluation

Organization

One of the major challenges of scaling up experiments and analysis is keeping all of the results organized in a clear manner. As we have seen in the last lectures, many of the results produced many text files

Queue Computing

Queue processing systems (like Sun Grid Engine, Oracle Grid Engine, Apple XGrid, Condor, etc) are used to manage


Resources

Structure of Cluster

Master (or Name) Node(s)

The node with which every other node communicates, the main address.

Worker Nodes

The nodes where the computation is performed.

Scheduler

The actual process that decides which jobs will run using which resources (worker nodes, memory, bandwidth) at which time

Databases

A database is a collection of data stored in the format of tables. A table consists of a number of columns and rows.

Animals

Here we have an table of the animals measured in an experiment and their weight

kable(data.frame(
                 id=c(1,2,3),
                 Weight=c(100,40,80)
                 ))
id Weight
1 100
2 40
3 80

Cells

The cells is then an analysis looking at the cellular structures

kable(data.frame(
  Animal=c(1,1,2),
  Type=c("Cancer","Healthy","Cancer"),
  Anisotropy=c(0.5,1.0,0.5), 
  Volume=c(1,2,0.95)
  ))
Animal Type Anisotropy Volume
1 Cancer 0.5 1.00
1 Healthy 1.0 2.00
2 Cancer 0.5 0.95

Relational-databases can store relationships between different tables so the relationship between Animal in table Cells and id in table Animals can be preserved.

library(igraph)
node.names<-c("Cells","Animals","Type","Anisotropy","Volume","Animal","id","Weight")
c.mat<-matrix(0,length(node.names),length(node.names))
colnames(c.mat)<-node.names
rownames(c.mat)<-node.names
c.mat["Cells","Animal"]<-1
c.mat["Cells","Type"]<-1
c.mat["Cells","Anisotropy"]<-1
c.mat["Cells","Volume"]<-1
c.mat["Animals","id"]<-1
c.mat["Animals","Weight"]<-1
c.mat["Animal","id"]<-1
g<-graph.adjacency(c.mat,mode="directed")
V(g)$degree <- degree(g)
V(g)$label <- V(g)$name
V(g)$color <- "lightblue"
V(g)["Cells"]$color<-"red"
V(g)["Animals"]$color<-"red"
V(g)["Cells"]$frame.width<-4 
V(g)["Animals"]$frame.width<-4
V(g)$size<-50
E(g)$width<-2
E(g)$curved<-F
E(g)$arrow.mode<-1
E(g)[7]$width<-5
E(g)[7]$curved<-T
E(g)[7]$arrow.mode<-3
E(g)$color<-"black"
cut.seq<-function(...,length.out,rem.edges=1) {
  out.seq<-seq(...,length.out=length.out+2*rem.edges)
  out.seq[c((rem.edges+1):(length.out+rem.edges))]
}
tf<-function(x) as.matrix(
  data.frame(x=c(0,0,rep(x,6)),
  y=c(-x,x,cut.seq(-2*x,0,length.out=4),cut.seq(0,2*x,length.out=2))))

plot(g,  layout=tf(20))# layout.kamada.kawai) #layout=layout.circle)#
plot of chunk unnamed-chunk-3

SQL

SQL (pronounced Sequel) stands for structured query language and is nearly universal for both searching (called querying) and adding (called inserting) data into databases. SQL is used in various forms from Firefox storing its preferences locally (using SQLite) to Facebook storing some of its user information (MySQL and Hive). So refering to the two tables we defined in the last entry, we can use SQL to get information about the tables independently of how they are stored (a single machine, a supercomputer, or in the cloud)


Basic queries

SELECT Volume FROM Cells \[ \rightarrow \begin{bmatrix} 1,2,0.95\end{bmatrix} \]

SELECT AVG(Volume) FROM Cells WHERE Type = "Cancer" \[ \rightarrow 0.975 \]

We could have done these easily without SQL using Excel, Matlab or R

More Advanced SQL

SELECT ATable.Weight,CTable.Volume FROM Animals as ATable 
  INNER JOIN Cells as CTable on (ATable.id=CTable.Animal)

\[ \rightarrow \begin{bmatrix} 1,0.95\end{bmatrix} \]

Networks using SQL

If we expand our SQL example to cellular networks with an additional table explicitly describing the links between cells

node.names<-c("1","2","3")
c.mat<-matrix(0,length(node.names),length(node.names))
colnames(c.mat)<-node.names
rownames(c.mat)<-node.names
c.mat["1","2"]<-800
c.mat["1","3"]<-40
c.mat["3","1"]<-300
g<-graph.adjacency(c.mat,mode="directed",weighted=T)
V(g)$degree <- degree(g)
V(g)$label <- V(g)$name
V(g)$color <- "lightblue"
V(g)$size<-50
E(g)$width<-2
E(g)$curved<-T
E(g)$arrow.mode<-1
E(g)$color<-"black"
plot(g,  layout=layout.kamada.kawai) #layout=layout.circle)#
plot of chunk unnamed-chunk-4


Table Representation

kable(data.frame(
                 id1=c(1,1,3),
                 id2=c(2,3,1),
                 No.Juncs=c(800,40,300)
                 ))
id1 id2 No.Juncs
1 2 800
1 3 40
3 1 300

Now to calculate how many connections each cell has

SELECT id,COUNT(*) AS connection_count FROM Cells 
  INNER JOIN Network ON (id=id1 OR id=id2)

\[ \rightarrow \begin{bmatrix} (1 & 3) \\ (2 & 1) \\ (3 & 2)\end{bmatrix} \]

Beyond SQL: NoSQL

Basic networks can be entered and queries using SQL but relatively simple sounding requests can get complicated very quickly

How many cells are within two connections of each cell

SELECT id,COUNT(*) AS connection_count FROM Cells as CellsA
  INNER JOIN Network as NetA ON Where (id=NetA.id1)
  INNER JOIN Network as NetB ON Where (NetA.id2=NetB.id1)

This is still readable but becomes very cumbersome quickly and difficult to manage


NoSQL (Not Only SQL)

A new generation of database software which extends the functionality of SQL to allow for more scalability (MongoDB) or specificity for problems like networks or graphs called generally Graph Databases

Definition: Big Data

Velocity, Volume, Variety

When a ton of heterogeneous is coming in fast.

Performant, scalable, and flexible

When scaling isn't scary

10X, 100X, 1000X is the same amount of effort

When you are starving for enough data

Director of AMPLab said their rate limiting factor is always enough interesting data

O 'clicks' per sample

A brief oversimplified story

Google ran into 'big data' and its associated problems years ago: Peta- and exabytes of websites to collect and make sense of. Google uses an algorithm called PageRank(tm) for evaluating the quality of websites. They could have probably used existing tools if page rank were some magic program that could read and determine the quality of a site

for every_site_on_internet
  current_site.rank=secret_pagerank_function(current_site)
end

Just divide all the websites into a bunch of groups and have each computer run a group, easy!

PageRank

While the actual internals of PageRank are not public, the general idea is that sites are ranked based on how many sites link to them

for current_site in every_site_on_internet
  current_pagerank = new SecretPageRankObj(current_site);
  for other_site in every_site_on_internet
    if current_site is_linked_to other_site
      current_pagerank.add_site(other_site);
    end
  end
  current_site.rank=current_pagerank.rank();
end

How do you divide this task?

It gets better

Google's Solution: MapReduce (part of it)

some people claim to have had the idea before, Google is certainly the first to do it at scale

Several engineers at Google recognized common elements in many of the tasks being performed. They then proceeded to divide all tasks into two classes Map and Reduce

Map

Map is where a function is applied to every element in the list and the function depends only on exactly that element \[ \vec{L} = \begin{bmatrix} 1,2,3,4,5 \end{bmatrix} \] \[ f(x) = x^2 \] \[ map(f \rightarrow \vec{L}) = \begin{bmatrix} 1,4,9,16,25 \end{bmatrix} \]


Reduce

Reduce is more complicated and involves aggregating a number of different elements and summarizing them. For example the \(\Sigma\) function can be written as a reduce function \[ \vec{L} = \begin{bmatrix} 1,2,3,4,5 \end{bmatrix} \] \[ g(a,b) = a+b \] Reduce then applies the function to the first two elements, and then to the result of the first two with the third and so on until all the elements are done \[ reduce(f \rightarrow \vec{L}) = g(g(g(g(1,2),3),4),5) \]

MapReduce

They designed a framework for handling distributing and running these types of jobs on clusters. So for each job a dataset (\(\vec{L}\)), Map-task (\(f\)), a grouping, and Reduce-task (\(g\)) are specified


All of the steps in between can be written once in a robust, safe manner and then used for every task which can be described using this MapReduce paradigm. These tasks \(\langle \vec{L}, f(x), g(a,b) \rangle\) is refered to as a job.

Key-Value Pairs / Grouping

The initial job was very basic, for more complicated jobs, a new notion of Key-value (KV) pairs must be introduced. A KV pair is made up of a key and value. A key must be comparable / hashable (a number, string, immutable list of numbers, etc) and is used for grouping data. The value is the associated information to this key.

Counting Words

Using MapReduce on a folder full of text-documents.


L = ["cat dog car",
  "dog car dog"]

\[ \downarrow \textbf{ Map } : f(x) \]

[("cat",1),("dog",1),("car",1),("dog",1),("car",1),("dog",1)]

\[ \downarrow \textrm{ Shuffle / Group} \]

"cat": (1)
"dog": (1,1,1)
"car": (1,1)

\[ \downarrow \textbf{ Reduce } : g(a,b) \]

[("cat",1),("dog",3),("car",2)]

Hadoop

Hadoop is the opensource version of MapReduce developed by Yahoo and released as an Apache project. It provides underlying infrastructure and filesystem that handles storing and distributing data so each machine stores some of the data locally and processing jobs run where the data is stored.

Spark / Resilient Distributed Datasets

Technical Specifications

Zaharia, M., et. al (2012). Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing


Practical Specification

Near Future Imaging Goals

Needs

Would be nice


map.data.dir<-"/Users/mader/Dropbox/WorkRelated/Papers/MapReduce3D/ext-data"
pmd<-function(...,sep="") paste(map.data.dir,...,sep="/")
 read.fcn<-function(filename) {
  in.table<-read.csv(filename,header=F)
  col.names<-c("Type","Master","iters","x","y","z","MapTime","RunTime")
  if(ncol(in.table)>10) col.names<-c(col.names,"mapsteps","nodes","persist","compress")
  names(in.table)<-col.names
  ext.nodes<-function(x) as.numeric(substr(x,7,nchar(x)-1))
  if(ncol(in.table)<10) {
    in.table$nodes<-sapply(as.character(in.table$Master),ext.nodes)
    in.table$persist<-0
    in.table$compress<-F
  }
  in.table$storage<-sapply(in.table$persist,function(x) 
    switch(x+1,"Memory","Memory (S)","Memory-Disk","Memory-Disk (S)","Only Disk")
  )
  in.table$compress<-as.logical(in.table$compress)
  in.table$compression<-ifelse(in.table$compress,"compressed","uncompressed")
  fc<-function(x,width=4,...) formatC(x,width=width,...)
  in.table$dim<-with(in.table,paste(fc(x),fc(y),fc(z),sep=","))
  in.table
}

distfilter.csv<-read.fcn(pmd("dist_filter.csv"))
fit.data<-distfilter.csv
fit.data$voxpersec<-with(fit.data,iters*as.numeric(x)*y*z/(RunTime/1000))

filter.lm<-lm(voxpersec~(nodes),data=fit.data)
predict.table<-data.frame(name=c("Kevin's Laptop","GWS-3","Beamline Standard","Beamline Full","Merlin Full","CSCS Monte Rosa"),nodes=c(2,20,60,12*6+20,30*12+16,47872),exp="Filter")
predict.table$est.voxpersec<-predict(filter.lm,predict.table)

goal.table<-data.frame(name=c("1","10\n Gigafrost","0.001\n15 min/scan\n","0.0002\n20 scans/day\n"),
                       gvph=(2560*2560*2160)/1e9*c(3600,10*3600,4,20/24),
                       x.start=min(predict.table$nodes),x.end=max(predict.table$nodes))
ggplot(predict.table,aes(x=nodes,y=est.voxpersec*3600/1e9)
       )+
  geom_segment(data=goal.table,aes(color=name,x=x.start,xend=x.end,y=gvph,yend=gvph),alpha=0.8,size=2)+
  geom_line(color="black")+
  geom_point(aes(shape=name),size=5)+
  
  labs(x="Nodes Used",y="Gigavoxels per hour",shape="Cluster",color="Goal\nOps/s",title="Log-Log")+
  scale_y_log10()+scale_x_log10()+theme_bw(15)
Tomcat Goals

ggplot(predict.table,aes(x=nodes,y=est.voxpersec*3600/1e9)
       )+
  geom_segment(data=goal.table,aes(color=name,x=0,xend=376,y=gvph,yend=gvph),alpha=0.8,size=2)+
  geom_line(color="black")+
  geom_point(aes(shape=name),size=5)+
  labs(x="Nodes Used",y="Gigavoxels per hour",shape="Cluster",color="Goal\nOps/s",title="Linear Scale")+
  xlim(0,400)+ylim(0,200)+theme_bw(15)
Tomcat Goals

Post-processing: Statistical Analysis


If we do that, we miss a lot!

data.dir<-"/Users/mader/Dropbox/WorkRelated/Papers/iwbbio2014"
pd<-function(...,sep="") paste(data.dir,...,sep="/")
# read and correct the coordinate system
# Routine for Connecting to the Database
raw.summary<-read.csv(pd("summary.csv"))
# fix column names
raw.names<-names(raw.summary)
names(raw.summary)<-sapply(raw.names,function(x) gsub("[.]1","_SD",x))
raw.summary<-subset(raw.summary,VOLUME>0 & VOLUME<1000 & count<100000 & count>10000) # filter out the corrupt files

mean.nona<-function(...) mean(...,na.rm=T)
sd.nona<-function(...) sd(...,na.rm=T)
imp.phenotypes<-names(raw.summary)[c(9:11,15,19,23,24:30,35,38,42)]
imp.phenotypes<-names(raw.summary)[c(15,19,23,29,35,38,42,44,45)]
var.pheno<-function(in.df,in.col) {
  sd.col<-paste(in.col,"_SD",sep="")
  mean.vals<-in.df[,in.col]
  sd.vals<-in.df[,sd.col]
  ie.var<-c(sd.nona(mean.vals)/mean.nona(abs(mean.vals)))*100
  ia.var<-c(mean.nona(sd.vals)/mean.nona(abs(mean.vals)))*100
  data.frame(Phenotype=c(in.col),
             Within.Sample.CV=ia.var,
             Between.Sample.CV=ie.var,
             Ratio=ia.var/ie.var*100)   
}
out.table<-ldply(imp.phenotypes,function(col.name) var.pheno(raw.summary,col.name))
names(out.table)<-c("Phenotype","Within","Between","Ratio (%)")
out.table$Phenotype<-c("Length","Width","Height","Volume","Nearest Canal Distance","Density (Lc.Te.V)","Nearest Neighbors (Lc.DN)","Stretch (Lc.St)","Oblateness (Lc.Ob)")
kable(out.table)
Phenotype Within Between Ratio (%)
Length 36.97 4.279 864.1
Width 27.92 4.734 589.9
Height 25.15 4.636 542.5
Volume 67.85 12.479 543.7
Nearest Canal Distance 70.35 333.395 21.1
Density (Lc.Te.V) 144.40 27.658 522.1
Nearest Neighbors (Lc.DN) 31.86 1.835 1736.1
Stretch (Lc.St) 13.98 2.360 592.5
Oblateness (Lc.Ob) 141.27 18.465 765.1

The results in the table show the within and between sample variation for selected phenotypes in the first two columns and the ratio of the within and between sample numbers

Visualizing the Variation

How does this result look visually? Each line shows the mean \(\pm\) standard deviation for sample. The range within a single sample is clearly much larger than between

ggplot(raw.summary,aes(x=as.numeric(as.factor(VOLUME)),y=VOLUME,ymin=1e9*VOLUME-1e9*VOLUME_SD,ymax=1e9*VOLUME+1e9*VOLUME_SD))+geom_errorbar()+geom_point()+theme_bw(25)+labs(x="Sample",y=expression(paste("Volume (",um^3,")")))
Partial Volume Effect

Do not condense

With the ability to scale to millions of samples there is no need to condense. We can analyze the entire dataset in real-time and try and identify groups or trends within the whole data instead of just precalculated views of it.

Practical

1276 comma separated text files with 56 columns of data and 15-60K rows

results.df<-data.frame(Task=c("Load and Preprocess","Single Column Average","1 K-means Iteration"),
                       Single.Time=c("360 minutes","4.6s","2 minutes"),
                       Spark.Time=c("10 minutes","400ms","1s"))
names(results.df)[2]<-"Single Core Time"
names(results.df)[3]<-"Spark Time (40 cores)"
kable(results.df)
Task Single Core Time Spark Time (40 cores)
Load and Preprocess 360 minutes 10 minutes
Single Column Average 4.6s 400ms
1 K-means Iteration 2 minutes 1s

Can iteratively explore and hypothesis test with data quickly

We found several composite phenotypes which are more consistent within samples than between samples

Rich, heavily developed platform

Available Tools

Tools built for table-like data data structures and much better adapted to it.

Commercial Support

Dozens of major companies (Apple, Google, Facebook, Cisco, ...) donate over $30M a year to development of Spark and the Berkeley Data Analytics Stack

Academic Support

Beyond: Streaming

Post-processing goals

Streaming

Can handle static data or live data coming in from a 'streaming' device like a camera to do real-time analysis. The exact same code can be used for real-time analysis and static code

Scalability

Connect more computers.

Start workers on these computer.

Beyond: Approximate Results

Projects at AMPLab like Spark and BlinkDB are moving towards approximate results.

For real-time image processing it might be the only feasible solution and could drastically reduce the amount of time spent on analysis.