Spark Multiclass Classification Example

Do you guys know where can I find examples of multiclass classification in Spark. I spent a lot of time searching in books and in the web, and so far I just know that it is possible since the latest version according the documentation.



(Recommended in Spark 2.0+)

We'll use the same data as in the MLlib below. There are two basic options. If Estimator supports multilclass classification out-of-the-box (for example random forest) you can use it directly:

val trainRawDf = trainRaw.toDF

import{Tokenizer, CountVectorizer, StringIndexer}


val transformers = Array(
  new StringIndexer().setInputCol("group").setOutputCol("label"),
  new Tokenizer().setInputCol("text").setOutputCol("tokens"),
  new CountVectorizer().setInputCol("tokens").setOutputCol("features")

val rf = new RandomForestClassifier() 

val model = new Pipeline().setStages(transformers :+ rf).fit(trainRawDf)


If model supports only binary classification (logistic regression) and extends you can use one-vs-rest strategy:


val lr = new LogisticRegression() 

val ovr = new OneVsRest().setClassifier(lr)

val ovrModel = new Pipeline().setStages(transformers :+ ovr).fit(trainRawDf)


According to the official documentation at this moment (MLlib 1.6.0) following methods support multiclass classification:

  • logistic regression,
  • decision trees,
  • random forests,
  • naive Bayes

At least some of the examples use multiclass classification:

General framework, ignoring method specific arguments, is pretty much the same as for all the other methods in MLlib. You have to pre-processes your input to create either data frame with columns representing label and features:

 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

or RDD[LabeledPoint].

Spark provides broad range of useful tools designed to facilitate this process including Feature Extractors and Feature Transformers and pipelines.

You'll find a rather naive example of using Random Forest below.

First lets import required packages and create dummy data:

import sqlContext.implicits._
import{HashingTF, Tokenizer} 
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

case class LabeledRecord(group: String, text: String)

val trainRaw = sc.parallelize(
    LabeledRecord("foo", "foo v a y b  foo") ::
    LabeledRecord("bar", "x bar y bar v") ::
    LabeledRecord("bar", "x a y bar z") ::
    LabeledRecord("foobar", "foo v b bar z") ::
    LabeledRecord("foo", "foo x") ::
    LabeledRecord("foobar", "z y x foo a b bar v") ::

Now let's define required transformers and process train Dataset:

// Tokenizer to process text fields
val tokenizer = new Tokenizer()

// HashingTF to convert tokens to the feature vector
val hashingTF = new HashingTF()

// Indexer to convert String labels to Double
val indexer = new StringIndexer()

def transfom(rdd: RDD[LabeledRecord]) = {
    val tokenized = tokenizer.transform(rdd.toDF)
    val hashed = hashingTF.transform(tokenized)
    val indexed = indexer.transform(hashed)
        .select($"label", $"features")
        .map{case Row(label: Double, features: Vector) =>
            LabeledPoint(label, features)}

val train: RDD[LabeledPoint] = transfom(trainRaw)

Please note that indexer is "fitted" on the train data. It simply means that categorical values used as the labels are converted to doubles. To use classifier on a new data you have to transform it first using this indexer.

Next we can train RF model:

val numClasses = 3
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 10
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 4
val maxBins = 16

val model = RandomForest.trainClassifier(
    train, numClasses, categoricalFeaturesInfo, 
    numTrees, featureSubsetStrategy, impurity,
    maxDepth, maxBins

and finally test it:

val testRaw = sc.parallelize(
    LabeledRecord("foo", "foo  foo z z z") ::
    LabeledRecord("bar", "z bar y y v") ::
    LabeledRecord("bar", "a a  bar a z") ::
    LabeledRecord("foobar", "foo v b bar z") ::
    LabeledRecord("foobar", "a foo a bar") ::

val test: RDD[LabeledPoint] = transfom(testRaw)

val predsAndLabs = => (model.predict(lp.features), lp.label))
val metrics = new MulticlassMetrics(predsAndLabs)


Are you using Spark 1.6 rather than Spark 2.1? I think the problem is that in spark 2.1 the transform method returns a dataset, which can be implicitly converted to a typed RDD, where as prior to that, it returns a data frame or row.

Try as a diagnostic specifying the return type of the transform function as RDD[LabeledPoint] and see if you get the same error.

Need Your Help

How to stop a Daemon Server in Rails?

ruby-on-rails ubuntu

I am running my rails application using the following

How to route EVERYTHING other than Web API to /index.html

c# angularjs

I've been working on an AngularJS project, inside of ASP.NET MVC using Web API. It works great except when you try to go directly to an angular routed URL or refresh the page. Rather than monkeying