The CMS.gov dataset provides slightly under 16 million anonymized outpatient claims for Medicare/Medicaid patients. Each outpatient record can have upto 6 ICD-9 procedure codes, upto 10 ICD-9 diagnosis codes and upto 45 HCPCS codes. So just like the outlier case, we can derive a measure of similarity between a pair of codes as the average co-occurrence within claims across the dataset.
I decided to use a variant of the DBSCAN clustering algorithm. This post provides some tips on how to implement DBSCAN in a distributed manner – I used the ideas in this post to develop my implementation. The intuition behind my clustering algorithm goes something like this.
We calculate the similarity sAB between a pair of codes A and B as the number of times they co-occur in the corpus. Clustering algorithms need a distance measure, so we treat the distance dAB as the reciprocal of their similarity, ie 1/sAB. The DBSCAN clustering algorithm works by selecting other points around each point that are within a specified distance ε from each other. Candidate cluster centroids are those that have at least MinPoints codes within this distance ε. My algorithm deviates from DBSCAN at this point – instead of finding density-reachable codes I just find the Top-N densest clusters. Density is calculated as the number of codes within a circular area of the mean radius, i.e. N2 / πΣi=0..Nd2. We then calculate the top N densest code clusters – these are our derived ETGs.
The Scalding code below does just this. We simplify a bit by not calculating using some constants such as π but otherwise the code is quite faithful to the algorithm described above.
// Source: src/main/scala/com/mycompany/cmspp/cluster/CodeCluster.scala
package com.mycompany.cmspp.clusters
import com.twitter.scalding.Job
import com.twitter.scalding.Args
import com.twitter.scalding.TextLine
import com.twitter.scalding.Tsv
import scala.io.Source
class CodeCluster(args: Args) extends Job(args) {
def extractPairs(line: String): List[(String,String)] = {
val cols = line.split(",").toList
val codes = (cols.slice(22, 27) // ICD9 procedure code cols
.map(x => if (x.isEmpty) x else "ICD9:" + x)
::: cols.slice(31, 75) // HCPCS (CPT4) procedure cols
.map(x => if (x.isEmpty) x else "HCPCS:" + x))
.filter(x => (! x.isEmpty))
val cjoin = for {codeA <- codes; codeB <- codes} yield (codeA, codeB)
cjoin.filter(x => x._1 < x._2)
}
val Epsilon = args("epsilon").toDouble
val MinPoints = args("minpoints").toInt
val NumClusters = args("nclusters").toInt
val output = Tsv(args("output"))
val dists = TextLine(args("input"))
.read
// compute pair-wise distances between procedure codes
.flatMapTo('line -> ('codeA, 'codeB)) { line: String => extractPairs(line) }
.groupBy('codeA, 'codeB) { group => group.size('sim) }
.map('sim -> 'radius) { x: Int => (1.0D / x) }
.discard('sim)
// group by codeA and retain only records which are within epsilon distance
.groupBy('codeA) { group => group.sortBy('radius).reverse }
.filter('radius) { x: Double => x < Epsilon }
val codeCounts = dists
.groupBy('codeA) { group =>
group.sizeAveStdev('radius -> ('count, 'mean, 'std))
}
// only retain codes that have at least MinPoints points within Epsilon
.filter('count) { x: Int => x > MinPoints }
.discard('std)
val densities = dists.joinWithSmaller(('codeA -> 'codeA), codeCounts)
.map(('mean, 'count) -> 'density) { x: (Double,Int) =>
1.0D * Math.pow(x._2, 2) / Math.pow(x._1, 2)
}
.discard('radius, 'count)
// sort the result by density descending and find the top N clusters
val densestCodes = densities.groupAll { group =>
group.sortBy('density).reverse }
.unique('codeA)
.limit(NumClusters)
// join code densities with densest codes to find final clusters
densities.joinWithTiny(('codeA -> 'codeA), densestCodes)
.groupBy('codeA) { group => group.mkString('codeB, ",")}
.write(output)
}
object CodeCluster {
def main(args: Array[String]): Unit = {
// populate redis cache
new CodeCluster(Args(List(
"--local", "",
"--epsilon", "0.3",
"--minpoints", "10",
"--nclusters", "10",
"--input", "data/outpatient_claims.csv",
"--output", "data/clusters.csv"
))).run
Source.fromFile("data/clusters.csv")
.getLines()
.foreach(Console.println(_))
}
}
|
I ran this locally with 1 million claims (out of the 16 million claims in my dataset) and got results like this:
And thats all I have for today. I’d like to point out a new book on Scalding, Programming MapReduce with Scalding by Antonios Chalkiopoulos. I was quite impressed by this book, you can read my review on Amazon if you are interested.