Read CSV as dataframe and convert to JSON string

I'm trying to aggregate a CSV file via Spark SQL and then show the result as JSON:

val people = sqlContext.read().format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ",").load("/tmp/people.csv")  
people.registerTempTable("people")  
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")

That's where I'm stuck. I can to a result.schema().prettyJson() which works flawlessly, but I don't find a way to return the result as JSON.

I was assuming that result.toJSON.collect() should do what I desire, but this fails with a

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 101.0 failed 1 times, most recent failure: Lost task 1.0 in stage 101.0 (TID 159, localhost): java.lang.NegativeArraySizeException
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:171)
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:162)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:511)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:704)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:704)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

error. Can somebody guide me?

Answers


The error you're getting is odd, it sounds like result is probably empty?

You might want to try this command on the dataframe to get each line printed out instead:

result.toJSON.foreach(println)

See the Dataframe API for a little more information


Turns out this error was because of a "malformed" CSV file. It contained some rows which had more columns than others (with no header field name)... Strange error message though.


Try

val people = sqlContext.read().format("com.databricks.spark.csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .option("mode", "DROPMALFORMED")
  .option("delimiter", ",")
  .load("/tmp/people.csv")  
people.registerTempTable("people")  
val result = sqlContext.sql("select country, count(*) as cnt from people group by country")

Need Your Help

Cakephp, Retrieve Data for HABTM Models using conditional find

php cakephp find has-and-belongs-to-many

There are 2 Models: Project &amp; Category that are bind with HABTM relationship.

API for parse/update UNIX configuration files

api unix configuration-files

Unix configuration files come in all shapes and forms. I know that Webmin has a Perl API that makes it easy to parse and modify most common configuration programmatically, while preserving changes ...