'reading and writing from hive tables with spark after aggregation

We have a hive warehouse, and wanted to use spark for various tasks (mainly classification). At times write the results back as a hive table. For example, we wrote the following python function to find the total sum of original_table column two, grouped by original_table column one. The function works, but we are worried that it is inefficient, particularly the maps to convert to key-value pairs, and dictionary versions. Functions combiner, mergeValue, mergeCombiner are defined elsewhere, but work fine.

from pyspark import HiveContext

rdd = HiveContext(sc).sql('from original_table select *')

#convert to key-value pairs
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1])))

#create rdd where rows are (key, (sum, count)
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner)

# creates rdd with dictionary values in order to create schemardd
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]})

# infer the schema
schema_rdd = HiveContext(sc).inferSchema(dict_rdd)

# save
schema_rdd.saveAsTable('new_table_name')

Are there more efficient ways of doing the same thing?



Solution 1:[1]

...perhaps this was not possible when the question was written, but doesn't it make sense now (post 1.3) to use the createDataFrame() call?

After getting your first RDD, it looks like you could make the call, then run a simple SQL statement against the structure to get the whole job done in one pass. (Sum and Grouping) Plus, the DataFrame structure can infer schema directly upon creation if I'm reading the API doc correctly.

(http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext)

Solution 2:[2]

This error can solved by setting hive.exec.scratchdir to the folder where user has access

Solution 3:[3]

What version of spark you are using ?

This answer is based on 1.6 & using the data frames.

val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._
val client = Seq((1, "A", 10), (2, "A", 5), (3, "B", 56)).toDF("ID", "Categ", "Amnt")

    import org.apache.spark.sql.functions._
    client.groupBy("Categ").agg(sum("Amnt").as("Sum"), count("ID").as("count")).show()


+-----+---+-----+
|Categ|Sum|count|
+-----+---+-----+
|    A| 15|    2|
|    B| 56|    1|
+-----+---+-----+

Hope this helps !!

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 agentv
Solution 2 dandan78
Solution 3 venkat_d