Salmon Run: Clustering Medical Procedure Codes with Scalding

A colleague pointed out that there exists an inverse use-case to finding outliers in medical claims. This one is to group procedure codes into clusters, or what Health Insurance companies call Episode Treatment Groups (ETG). Essentially an ETG is a way to cluster a group of services (procedures) into a medically relevant unit.

The CMS.gov dataset provides slightly under 16 million anonymized outpatient claims for Medicare/Medicaid patients. Each outpatient record can have upto 6 ICD-9 procedure codes, upto 10 ICD-9 diagnosis codes and upto 45 HCPCS codes. So just like the outlier case, we can derive a measure of similarity between a pair of codes as the average co-occurrence within claims across the dataset.

I decided to use a variant of the DBSCAN clustering algorithm. This post provides some tips on how to implement DBSCAN in a distributed manner – I used the ideas in this post to develop my implementation. The intuition behind my clustering algorithm goes something like this.

We calculate the similarity sAB between a pair of codes A and B as the number of times they co-occur in the corpus. Clustering algorithms need a distance measure, so we treat the distance dAB as the reciprocal of their similarity, ie 1/sAB. The DBSCAN clustering algorithm works by selecting other points around each point that are within a specified distance ε from each other. Candidate cluster centroids are those that have at least MinPoints codes within this distance ε. My algorithm deviates from DBSCAN at this point – instead of finding density-reachable codes I just find the Top-N densest clusters. Density is calculated as the number of codes within a circular area of the mean radius, i.e. N2 / πΣi=0..Nd2. We then calculate the top N densest code clusters – these are our derived ETGs.

The Scalding code below does just this. We simplify a bit by not calculating using some constants such as π but otherwise the code is quite faithful to the algorithm described above.

// Source: src/main/scala/com/mycompany/cmspp/cluster/CodeCluster.scala
package com.mycompany.cmspp.clusters

import com.twitter.scalding.Job
import com.twitter.scalding.Args
import com.twitter.scalding.TextLine
import com.twitter.scalding.Tsv
import scala.io.Source

class CodeCluster(args: Args) extends Job(args) {

  def extractPairs(line: String): List[(String,String)] = {
    val cols = line.split(",").toList
    val codes = (cols.slice(22, 27)  // ICD9 procedure code cols
      .map(x => if (x.isEmpty) x else "ICD9:" + x) 
      ::: cols.slice(31, 75)         // HCPCS (CPT4) procedure cols
      .map(x => if (x.isEmpty) x else "HCPCS:" + x))          
      .filter(x => (! x.isEmpty))
    val cjoin = for {codeA <- codes; codeB <- codes} yield (codeA, codeB)
    cjoin.filter(x => x._1 < x._2)
  }

  val Epsilon = args("epsilon").toDouble
  val MinPoints = args("minpoints").toInt
  val NumClusters = args("nclusters").toInt
  
  val output = Tsv(args("output"))

  val dists = TextLine(args("input"))
    .read
    // compute pair-wise distances between procedure codes
    .flatMapTo('line -> ('codeA, 'codeB)) { line: String => extractPairs(line) }
    .groupBy('codeA, 'codeB) { group => group.size('sim) }
    .map('sim -> 'radius) { x: Int => (1.0D / x) }
    .discard('sim)
    // group by codeA and retain only records which are within epsilon distance
    .groupBy('codeA) { group => group.sortBy('radius).reverse }
    .filter('radius) { x: Double => x < Epsilon }
    
  val codeCounts = dists
    .groupBy('codeA) { group => 
      group.sizeAveStdev('radius -> ('count, 'mean, 'std)) 
    }
    // only retain codes that have at least MinPoints points within Epsilon
    .filter('count) { x: Int => x > MinPoints }
    .discard('std)

  val densities = dists.joinWithSmaller(('codeA -> 'codeA), codeCounts)
    .map(('mean, 'count) -> 'density) { x: (Double,Int) => 
      1.0D * Math.pow(x._2, 2) / Math.pow(x._1, 2)
    }
    .discard('radius, 'count)
    
  // sort the result by density descending and find the top N clusters
  val densestCodes = densities.groupAll { group => 
      group.sortBy('density).reverse }
    .unique('codeA)
    .limit(NumClusters)
    
  // join code densities with densest codes to find final clusters
  densities.joinWithTiny(('codeA -> 'codeA), densestCodes)
    .groupBy('codeA) { group => group.mkString('codeB, ",")}
    .write(output)
}

object CodeCluster {
  def main(args: Array[String]): Unit = {
    // populate redis cache
    new CodeCluster(Args(List(
      "--local", "",
      "--epsilon", "0.3",
      "--minpoints", "10",
      "--nclusters", "10",
      "--input", "data/outpatient_claims.csv",
      "--output", "data/clusters.csv"
    ))).run
    Source.fromFile("data/clusters.csv")
      .getLines()
      .foreach(Console.println(_))
  }
}

I ran this locally with 1 million claims (out of the 16 million claims in my dataset) and got results like this:

And thats all I have for today. I’d like to point out a new book on Scalding, Programming MapReduce with Scalding by Antonios Chalkiopoulos. I was quite impressed by this book, you can read my review on Amazon if you are interested.