Quantitative Big Imaging

Kevin Mader
8 May 2014

ETHZ: 227-0966-00L

Scaling Up / Big Data

Exercises

Tools for the Exercise

  • Basic Exercises
    • Matlab
    • Spark (Advanced)

Objectives

  1. Make a basic job using Condor
  2. Make an parameter sweep using Condor
  3. Use a 'big data' tool for basic image processing

Setting up the Computers

Once you have logged in

  1. Install FIJI (if it is not already installed)
    • FIJI can be downloaded from this link
  2. Start FIJI
  3. Start RStudio by typing rstudio at command line

Creating a Scripted Analysis

Starting Macro Movie So we have a macro which runs once, how can we modify it to perform a Parameter Sweep Analysis?

  • Using the loops inside the ImageJ API.
  • Make sure you save each file with a different name
  • Make sure you select the image window before doing any additional processing
for (threshold=90; threshold<110; threshold++) {
      selectWindow("blobs.gif");
      setThreshold(threshold, 255);
      run("3D Objects Counter", "threshold="+threshold+" slice=0 min.=1 max.=65024 statistics");
      outFileName=getDirectory("home")+File.separator+"blob_analysis_th_"+threshold+".txt";
      print("Saving Results as:"+outFileName);
      saveAs("Results", outFileName);
}

Create a new Macro

Use these starting macros and scripts

  1. Plugins \( \rightarrow \) New \( \rightarrow \) Macro
  2. Language \( \rightarrow \) ImageJ Macro
  3. Type or paste code in and run

Condor Scripts

Condor (like Sun Grid Engine) is a queue processing system. It manages a cluster (large collection of computers connected with a network and sharing some if not all storage) and distributes tasks to each computer. Job (task) is a single task to be executed containing information on where the data is located and what should be done with the data.

Condor at ITET

The instructions and specific details for Condor at ITET (for simplicity we will not precompile our Matlab jobs, but in future you should, read here)

Basic Commands

  • Submit a job condor_submit
  • Check the status of jobs condor_q
  • Delete a job condor_rm
  • Delete all jobs condor_rm -all

First Demo Script

The demo script is provided by D-ITET and can be run by typing jobs cannot be run from the scratch folder

cd ~/
git clone https://gist.github.com/a49814356c7e707bb0dc.git
cd a49814356c7e707bb0dc
chmod +x mandelbrot.sh
condor_submit mandelbrot.condor

Cell Colony Demo Script

A script to try a number of thresholds on the cell colony image

cd ~/
git clone https://gist.github.com/c96640bdbd8961ef9ec7.git
cd c96640bdbd8961ef9ec7
chmod +x filterandthreshold.sh
condor_submit batchimage.condor

Adaptations

  1. Make the script perform different filters instead of thresholds
  2. How would you run this on multiple files?

Install Spark (Advanced)

cd /scratch
curl -o spark.tgz http://d3kbcqa49mib13.cloudfront.net/spark-0.9.1-bin-hadoop1.tgz
tar -xvf spark.tgz
cd spark-0.9.1-bin-hadoop1/

Starting Spark

Spin up your own cluster in an hour ~~ we only use it on one node acting as the master, scheduler, and worker, but normally it is run on different computers ~~

  • Start the Spark-Shell ./bin/spark-shell
  • Start Spark-python ./bin/pyspark
    • Write code in Python

Getting an image to Key-Value Format

library(jpeg)
in.img<-readJPEG("c96640bdbd8961ef9ec7/Cell_Colony.jpg")
kv.img<-im.to.df(in.img)
write.table(kv.img,"cell_colony.csv",row.names=F,col.names=F,sep=",")
kable(head(kv.img))
x y val
1 1 0.6275
2 1 0.7804
3 1 0.8863
4 1 0.8980
5 1 0.9098
6 1 0.9216

The key is position \( \langle x, y \rangle \) and value is the intensity \( val \)

Loading the data into Spark (Scala)

val rawImage=sc.textFile("cell_colony.csv")
val imgAsColumns=rawImage.map(_.split(","))
val imgAsKV=imgAsColumns.map(point => ((point(0).toInt,point(1).toInt),point(2).toDouble))
  • Count the number of pixels
imgAsKV.count
  • Get the first value
imgAsKV.take(1)
  • Sample 100 values from the data
imgAsKV.sample(true,0.1,0).collect

Perform a threshold

val threshVal=0.5
val labelImg=imgAsKV.filter(_._2<threshVal)
  • Runs on 1 core on your laptop or 1000 cores in the cloud or on Merlin or the beamline.
  • If one computer crashes or disconnects it automatically continues on another one.
  • If one part of the computation is taking too long it will be sent to other computers to finish
  • If a computer runs out of memory it writes the remaining results to disk and continues running (graceful dropoff in performance )

Get Volume Fraction

100.0*labelImg.count/(imgAsKV.count)

Region of Interest

Take a region of interest between 0 and 100 in X and Y

def roiFun(pvec: ((Int,Int),Double)) = 
 {pvec._1._1>=0 & pvec._1._1<100 & // X
  pvec._1._2>=0 & pvec._1._2<100 } //Y
val roiImg=imgAsKV.filter(roiFun)

Perform a 3x3 box filter

def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
  val wind=(-windSize to windSize)
  val pos=pvec._1
  val scalevalue=pvec._2/(wind.length*wind.length)
  for(x<-wind; y<-wind) 
    yield ((pos._1+x,pos._2+y),scalevalue)
}

val filtImg=roiImg.
      flatMap(cvec => spread_voxels(cvec)).
      filter(roiFun).reduceByKey(_ + _)

Setting up Component Labeling

  • Create the first labels from a thresheld image as a mutable type
val xWidth=100
var newLabels=labelImg.map(pvec => (pvec._1,(pvec._1._1.toLong*xWidth+pvec._1._2+1,true)))
  • Spreading to Neighbor Function
def spread_voxels(pvec: ((Int,Int),(Long,Boolean)), windSize: Int = 1) = {
  val wind=(-windSize to windSize)
  val pos=pvec._1
  val label=pvec._2._1
  for(x<-wind; y<-wind) 
    yield ((pos._1+x,pos._2+y),(label,(x==0 & y==0)))
}

Running Component Labeling

var groupList=Array((0L,0))
var running=true
var iterations=0
while (running) {
  newLabels=newLabels.
  flatMap(spread_voxels(_,1)).
    reduceByKey((a,b) => ((math.min(a._1,b._1),a._2 | b._2))).
    filter(_._2._2)
  // make a list of each label and how many voxels are in it
  val curGroupList=newLabels.map(pvec => (pvec._2._1,1)).
    reduceByKey(_ + _).sortByKey(true).collect
  // if the list isn't the same as before, continue running since we need to wait for swaps to stop
  running = (curGroupList.deep!=groupList.deep)
  groupList=curGroupList
  iterations+=1
  print("Iter #"+iterations+":"+groupList.mkString(","))
}
groupList

Calculating From Images

  • Average Voxel Count
val labelSize = newLabels.
  map(pvec => (pvec._2._1,1)).
  reduceByKey((a,b) => (a+b)).
  map(_._2)
labelSize.reduce((a,b) => (a+b))*1.0/labelSize.count
  • Center of Volume for Each Label
val labelPositions = newLabels.
  map(pvec => (pvec._2._1,pvec._1)).
  groupBy(_._1)
def posAvg(pvec: Seq[(Long,(Int,Int))]): (Double,Double) = {
val sumPt=pvec.map(_._2).reduce((a,b) => (a._1+b._1,a._2+b._2))
(sumPt._1*1.0/pvec.length,sumPt._2*1.0/pvec.length)
}
print(labelPositions.map(pvec=>posAvg(pvec._2)).mkString(","))

Lazy evaluation

  • No execution starts until you save the file or require output
  • Spark automatically deconstructs the pipeline and optimizes the jobs to run so computation is not wasted outside of the region of interest (even though we did it last)