converting RDD to vector with fixed length file data

I'm new to Spark + Scala and still developing my intuition. I have a file containing many samples of data. Every 2048 lines represents a new sample. I'm attempting to convert each sample into a vector and then run through a k-means clustering algorithm. The data file looks like this:

123.34  800.18
456.123 23.16
...

When I'm playing with a very small subset of the data, I create an RDD from the file like this:

val fileData = sc.textFile("hdfs://path/to/file.txt")

and then create the vector using this code:

val freqLineCount = 2048
val numSamples    = 200
val freqPowers = fileData.map( _.split(" ")(1).toDouble )
val allFreqs    = freqPowers.take(numSamples*freqLineCount).grouped(freqLineCount)
val lotsOfVecs  = allFreqs.map(spec => Vectors.dense(spec) ).toArray
val lotsOfVecsRDD = sc.parallelize( lotsOfVecs ).cache()

val numClusters = 2
val numIterations = 2
val clusters = KMeans.train(lotsOfVecsRDD, numClusters, numIterations)

The key here is that I can call .grouped on an array of strings and it returns an array of arrays with the sequential 2048 values. That is then trivial to convert to vectors and run it through the KMeans training algo.

I'm attempting to run this code on a much larger data set and running into java.lang.OutOfMemoryError: Java heap space errors. Presumably because I'm calling the take method on my freqPowers variable and then performing some operations on that data.

How would I go about achieving my goal of running KMeans on this data set keeping in mind that

  1. each data sample occurs every 2048 lines in the file (so the file should be parsed somewhat sequentially)

  2. this code needs to run on a distributed cluster

  3. I need to not run out of memory :)

thanks in advance

Answers


You can do something like:

val freqLineCount = 2048
val freqPowers = fileData.flatMap(_.split(" ")(1).toDouble)

// Replacement of your current code.
val groupedRDD = freqPowers.zipWithIndex().groupBy(_._2 / freqLineCount)
val vectorRDD = groupedRDD.map(grouped => Vectors.dense(grouped._2.map(_._1).toArray))

val numClusters = 2
val numIterations = 2
val clusters = KMeans.train(vectorRDD, numClusters, numIterations)

The replacing code uses zipWithIndex() and division of longs to group RDD elements into freqLineCount chunks. After the grouping, the elements in question are extracted into their actual vectors.


Need Your Help

Does any one know of a packet generator?

networking

We have a networked device, and we would like to perform some tests on how it handles malformed packets.

Implementing message flood protection via JS

javascript protection

I'm currently running a server. A bot that I coded currently supplies administrative tools for moderators, but I would like the bot to also have some sort of automated flood-protection system built...