Converts a DataFrame column to an Array of values
N.B. This method uses collect
and should only be called on small DataFrames.
Converts a DataFrame column to an Array of values
N.B. This method uses collect
and should only be called on small DataFrames.
This function converts a column to an array of items.
Suppose we have the following sourceDF
:
+---+ |num| +---+ | 1| | 2| | 3| +---+
Let's convert the num
column to an Array of values. Let's run the code and view the results.
val actual = DataFrameHelpers.columnToArray[Int](sourceDF, "num") println(actual) // Array(1, 2, 3)
Converts a DataFrame column to a List of values
N.B. This method uses collect
and should only be called on small DataFrames.
Converts a DataFrame column to a List of values
N.B. This method uses collect
and should only be called on small DataFrames.
This function converts a column to a list of items.
Suppose we have the following sourceDF
:
+---+ |num| +---+ | 1| | 2| | 3| +---+
Let's convert the num
column to a List of values. Let's run the code and view the results.
val actual = DataFrameHelpers.columnToList[Int](sourceDF, "num") println(actual) // List(1, 2, 3)
Generates a CREATE TABLE query for AWS Athena
Generates a CREATE TABLE query for AWS Athena
Suppose we have the following df
:
+--------+--------+---------+ | team| sport|goals_for| +--------+--------+---------+ | jets|football| 45| |nacional| soccer| 10| +--------+--------+---------+
Run the code to print the CREATE TABLE query.
DataFrameHelpers.printAthenaCreateTable(df, "my_cool_athena_table", "s3://my-bucket/extracts/people") CREATE TABLE IF NOT EXISTS my_cool_athena_table( team STRING, sport STRING, goals_for INT ) STORED AS PARQUET LOCATION 's3://my-bucket/extracts/people'
Converts a DataFrame to an Array of Maps
N.B. This method uses collect
and should only be called on small DataFrames.
Converts a DataFrame to an Array of Maps
N.B. This method uses collect
and should only be called on small DataFrames.
Converts a DataFrame to an array of Maps.
Suppose we have the following sourceDF
:
+----------+-----------+---------+ |profession|some_number|pay_grade| +----------+-----------+---------+ | doctor| 4| high| | dentist| 10| high| +----------+-----------+---------+
Run the code to convert this DataFrame into an array of Maps.
val actual = DataFrameHelpers.toArrayOfMaps(sourceDF) println(actual) Array( Map("profession" -> "doctor", "some_number" -> 4, "pay_grade" -> "high"), Map("profession" -> "dentist", "some_number" -> 10, "pay_grade" -> "high") )
Converts two column to a map of key value pairs
Converts two column to a map of key value pairs
N.B. This method uses collect
and should only be called on small DataFrames.
Converts two columns in a DataFrame to a Map.
Suppose we have the following sourceDF
:
+-----------+---------+ | island|fun_level| +-----------+---------+ | boracay| 7| |long island| 9| +-----------+---------+
Let's convert this DataFrame to a Map with island
as the key and fun_level
as the value.
val actual = DataFrameHelpers.twoColumnsToMap[String, Integer]( sourceDF, "island", "fun_level" ) println(actual) // Map( // "boracay" -> 7, // "long island" -> 9 // )
Throws an error if the DataFrame contains any of the prohibited columns Validates columns are not included in a DataFrame.
Throws an error if the DataFrame contains any of the prohibited columns Validates columns are not included in a DataFrame. This code will error out:
val sourceDF = Seq( ("jets", "football"), ("nacional", "soccer") ).toDF("team", "sport") val prohibitedColNames = Seq("team", "sport", "country", "city") validateAbsenceOfColumns(sourceDF, prohibitedColNames)
This is the error message:
> com.github.mrpowers.spark.daria.sql.ProhibitedDataFrameColumnsException: The [team, sport] columns are not allowed to be included in the DataFrame with the following columns [team, sport]
Throws an error if the DataFrame doesn't contain all the required columns Validates if columns are included in a DataFrame.
Throws an error if the DataFrame doesn't contain all the required columns Validates if columns are included in a DataFrame. This code will error out:
val sourceDF = Seq( ("jets", "football"), ("nacional", "soccer") ).toDF("team", "sport") val requiredColNames = Seq("team", "sport", "country", "city") validatePresenceOfColumns(sourceDF, requiredColNames)
This is the error message
> com.github.mrpowers.spark.daria.sql.MissingDataFrameColumnsException: The [country, city] columns are not included in the DataFrame with the following columns [team, sport]
Throws an error if the DataFrame schema doesn't match the required schema
Throws an error if the DataFrame schema doesn't match the required schema
This code will error out:
val sourceData = List( Row(1, 1), Row(-8, 8), Row(-5, 5), Row(null, null) ) val sourceSchema = List( StructField("num1", IntegerType, true), StructField("num2", IntegerType, true) ) val sourceDF = spark.createDataFrame( spark.sparkContext.parallelize(sourceData), StructType(sourceSchema) ) val requiredSchema = StructType( List( StructField("num1", IntegerType, true), StructField("num2", IntegerType, true), StructField("name", StringType, true) ) ) validateSchema(sourceDF, requiredSchema)
This is the error message:
> com.github.mrpowers.spark.daria.sql.InvalidDataFrameSchemaException: The [StructField(name,StringType,true)] StructFields are not included in the DataFrame with the following StructFields [StructType(StructField(num1,IntegerType,true), StructField(num2,IntegerType,true))]