converting RDD to vector with fixed length file data
I'm new to Spark + Scala and still developing my intuition. I have a file containing many samples of data. Every 2048 lines represents a new sample. I'm attempting to convert each sample into a vector and then run through a k-means clustering algorithm. The data file looks like this:
123.34 800.18 456.123 23.16 ...
When I'm playing with a very small subset of the data, I create an RDD from the file like this:
val fileData = sc.textFile("hdfs://path/to/file.txt")
and then create the vector using this code:
val freqLineCount = 2048 val numSamples = 200 val freqPowers = fileData.map( _.split(" ")(1).toDouble ) val allFreqs = freqPowers.take(numSamples*freqLineCount).grouped(freqLineCount) val lotsOfVecs = allFreqs.map(spec => Vectors.dense(spec) ).toArray val lotsOfVecsRDD = sc.parallelize( lotsOfVecs ).cache() val numClusters = 2 val numIterations = 2 val clusters = KMeans.train(lotsOfVecsRDD, numClusters, numIterations)
The key here is that I can call .grouped on an array of strings and it returns an array of arrays with the sequential 2048 values. That is then trivial to convert to vectors and run it through the KMeans training algo.
I'm attempting to run this code on a much larger data set and running into java.lang.OutOfMemoryError: Java heap space errors. Presumably because I'm calling the take method on my freqPowers variable and then performing some operations on that data.
How would I go about achieving my goal of running KMeans on this data set keeping in mind that
each data sample occurs every 2048 lines in the file (so the file should be parsed somewhat sequentially)
this code needs to run on a distributed cluster
I need to not run out of memory :)
thanks in advance
You can do something like:
val freqLineCount = 2048 val freqPowers = fileData.flatMap(_.split(" ")(1).toDouble) // Replacement of your current code. val groupedRDD = freqPowers.zipWithIndex().groupBy(_._2 / freqLineCount) val vectorRDD = groupedRDD.map(grouped => Vectors.dense(grouped._2.map(_._1).toArray)) val numClusters = 2 val numIterations = 2 val clusters = KMeans.train(vectorRDD, numClusters, numIterations)
The replacing code uses zipWithIndex() and division of longs to group RDD elements into freqLineCount chunks. After the grouping, the elements in question are extracted into their actual vectors.