Object

com.github.mrpowers.spark.daria.sql

DataFrameHelpers

Related Doc: package sql

Permalink

object DataFrameHelpers extends DataFrameValidator

Linear Supertypes
DataFrameValidator, AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. DataFrameHelpers
  2. DataFrameValidator
  3. AnyRef
  4. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Value Members

  1. final def !=(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0

    Permalink
    Definition Classes
    Any
  5. def clone(): AnyRef

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  6. def columnToArray[T](df: DataFrame, colName: String)(implicit arg0: ClassTag[T]): Array[T]

    Permalink

    Converts a DataFrame column to an Array of values N.B. This method uses collect and should only be called on small DataFrames.

    Converts a DataFrame column to an Array of values N.B. This method uses collect and should only be called on small DataFrames.

    This function converts a column to an array of items.

    Suppose we have the following sourceDF:

    +---+
    |num|
    +---+
    |  1|
    |  2|
    |  3|
    +---+

    Let's convert the num column to an Array of values. Let's run the code and view the results.

    val actual = DataFrameHelpers.columnToArray[Int](sourceDF, "num")
    
    println(actual)
    
    // Array(1, 2, 3)
  7. def columnToList[T](df: DataFrame, colName: String)(implicit arg0: ClassTag[T]): List[T]

    Permalink

    Converts a DataFrame column to a List of values N.B. This method uses collect and should only be called on small DataFrames.

    Converts a DataFrame column to a List of values N.B. This method uses collect and should only be called on small DataFrames.

    This function converts a column to a list of items.

    Suppose we have the following sourceDF:

    +---+
    |num|
    +---+
    |  1|
    |  2|
    |  3|
    +---+

    Let's convert the num column to a List of values. Let's run the code and view the results.

    val actual = DataFrameHelpers.columnToList[Int](sourceDF, "num")
    
    println(actual)
    
    // List(1, 2, 3)
  8. final def eq(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  9. def equals(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  10. def finalize(): Unit

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  11. final def getClass(): Class[_]

    Permalink
    Definition Classes
    AnyRef → Any
  12. def hashCode(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  13. final def isInstanceOf[T0]: Boolean

    Permalink
    Definition Classes
    Any
  14. final def ne(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  15. final def notify(): Unit

    Permalink
    Definition Classes
    AnyRef
  16. final def notifyAll(): Unit

    Permalink
    Definition Classes
    AnyRef
  17. def printAthenaCreateTable(df: DataFrame, athenaTableName: String, s3location: String): Unit

    Permalink

    Generates a CREATE TABLE query for AWS Athena

    Generates a CREATE TABLE query for AWS Athena

    Suppose we have the following df:

    +--------+--------+---------+
    |    team|   sport|goals_for|
    +--------+--------+---------+
    |    jets|football|       45|
    |nacional|  soccer|       10|
    +--------+--------+---------+

    Run the code to print the CREATE TABLE query.

    DataFrameHelpers.printAthenaCreateTable(df, "my_cool_athena_table", "s3://my-bucket/extracts/people")
    
    CREATE TABLE IF NOT EXISTS my_cool_athena_table(
      team STRING,
      sport STRING,
      goals_for INT
    )
    STORED AS PARQUET
    LOCATION 's3://my-bucket/extracts/people'
  18. def readTimestamped(dirname: String): DataFrame

    Permalink
  19. lazy val spark: SparkSession

    Permalink
  20. final def synchronized[T0](arg0: ⇒ T0): T0

    Permalink
    Definition Classes
    AnyRef
  21. def toArrayOfMaps(df: DataFrame): Array[Map[String, Any]]

    Permalink

    Converts a DataFrame to an Array of Maps N.B. This method uses collect and should only be called on small DataFrames.

    Converts a DataFrame to an Array of Maps N.B. This method uses collect and should only be called on small DataFrames.

    Converts a DataFrame to an array of Maps.

    Suppose we have the following sourceDF:

    +----------+-----------+---------+
    |profession|some_number|pay_grade|
    +----------+-----------+---------+
    |    doctor|          4|     high|
    |   dentist|         10|     high|
    +----------+-----------+---------+

    Run the code to convert this DataFrame into an array of Maps.

    val actual = DataFrameHelpers.toArrayOfMaps(sourceDF)
    
    println(actual)
    
    Array(
      Map("profession" -> "doctor", "some_number" -> 4, "pay_grade" -> "high"),
      Map("profession" -> "dentist", "some_number" -> 10, "pay_grade" -> "high")
    )
  22. def toString(): String

    Permalink
    Definition Classes
    AnyRef → Any
  23. def twoColumnsToMap[keyType, valueType](df: DataFrame, keyColName: String, valueColName: String)(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[keyType], arg1: scala.reflect.api.JavaUniverse.TypeTag[valueType]): Map[keyType, valueType]

    Permalink

    Converts two column to a map of key value pairs

    Converts two column to a map of key value pairs

    N.B. This method uses collect and should only be called on small DataFrames.

    Converts two columns in a DataFrame to a Map.

    Suppose we have the following sourceDF:

    +-----------+---------+
    |     island|fun_level|
    +-----------+---------+
    |    boracay|        7|
    |long island|        9|
    +-----------+---------+

    Let's convert this DataFrame to a Map with island as the key and fun_level as the value.

    val actual = DataFrameHelpers.twoColumnsToMap[String, Integer](
      sourceDF,
      "island",
      "fun_level"
    )
    
    println(actual)
    
    // Map(
    //   "boracay" -> 7,
    //   "long island" -> 9
    // )
  24. def validateAbsenceOfColumns(df: DataFrame, prohibitedColNames: Seq[String]): Unit

    Permalink

    Throws an error if the DataFrame contains any of the prohibited columns Validates columns are not included in a DataFrame.

    Throws an error if the DataFrame contains any of the prohibited columns Validates columns are not included in a DataFrame. This code will error out:

    val sourceDF = Seq(
      ("jets", "football"),
      ("nacional", "soccer")
    ).toDF("team", "sport")
    
    val prohibitedColNames = Seq("team", "sport", "country", "city")
    
    validateAbsenceOfColumns(sourceDF, prohibitedColNames)

    This is the error message:

    > com.github.mrpowers.spark.daria.sql.ProhibitedDataFrameColumnsException: The [team, sport] columns are not allowed to be included in the DataFrame with the following columns [team, sport]

    Definition Classes
    DataFrameValidator
  25. def validatePresenceOfColumns(df: DataFrame, requiredColNames: Seq[String]): Unit

    Permalink

    Throws an error if the DataFrame doesn't contain all the required columns Validates if columns are included in a DataFrame.

    Throws an error if the DataFrame doesn't contain all the required columns Validates if columns are included in a DataFrame. This code will error out:

    val sourceDF = Seq(
      ("jets", "football"),
      ("nacional", "soccer")
    ).toDF("team", "sport")
    
    val requiredColNames = Seq("team", "sport", "country", "city")
    
    validatePresenceOfColumns(sourceDF, requiredColNames)

    This is the error message

    > com.github.mrpowers.spark.daria.sql.MissingDataFrameColumnsException: The [country, city] columns are not included in the DataFrame with the following columns [team, sport]

    Definition Classes
    DataFrameValidator
  26. def validateSchema(df: DataFrame, requiredSchema: StructType): Unit

    Permalink

    Throws an error if the DataFrame schema doesn't match the required schema

    Throws an error if the DataFrame schema doesn't match the required schema

    This code will error out:

    val sourceData = List(
      Row(1, 1),
     Row(-8, 8),
     Row(-5, 5),
     Row(null, null)
    )
    
    val sourceSchema = List(
      StructField("num1", IntegerType, true),
      StructField("num2", IntegerType, true)
    )
    
    val sourceDF = spark.createDataFrame(
      spark.sparkContext.parallelize(sourceData),
      StructType(sourceSchema)
    )
    
    val requiredSchema = StructType(
      List(
        StructField("num1", IntegerType, true),
        StructField("num2", IntegerType, true),
        StructField("name", StringType, true)
      )
    )
    
    validateSchema(sourceDF, requiredSchema)

    This is the error message:

    > com.github.mrpowers.spark.daria.sql.InvalidDataFrameSchemaException: The [StructField(name,StringType,true)] StructFields are not included in the DataFrame with the following StructFields [StructType(StructField(num1,IntegerType,true), StructField(num2,IntegerType,true))]

    Definition Classes
    DataFrameValidator
  27. final def wait(): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  28. final def wait(arg0: Long, arg1: Int): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  29. final def wait(arg0: Long): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  30. def writeTimestamped(df: DataFrame, outputDirname: String, numPartitions: Option[Int] = None, overwriteLatest: Boolean = true): Unit

    Permalink

Inherited from DataFrameValidator

Inherited from AnyRef

Inherited from Any

Ungrouped