append_without_duplicates(delta_table, append_df, p_keys)

Parameters:

Name Type Description Default
delta_table DeltaTable

required
append_df DataFrame

required
p_keys List[str]

required

Raises:

Type Description
TypeError

Raises type error when input arguments have a invalid type.

Source code in mack/__init__.py
def append_without_duplicates(
    delta_table: DeltaTable, append_df: DataFrame, p_keys: List[str]
) -> None:
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable
    :param append_df: <description>
    :type append_df: DataFrame
    :param p_keys: <description>
    :type p_keys: List[str]

    :raises TypeError: Raises type error when input arguments have a invalid type.
    """
    if not isinstance(delta_table, DeltaTable):
        raise TypeError("An existing delta table must be specified.")

    condition_columns = []
    for column in p_keys:
        condition_columns.append(f"old.{column} = new.{column}")

    condition_columns = " AND ".join(condition_columns)

    deduplicated_append_df = append_df.drop_duplicates(p_keys)

    # Insert records without duplicates
    delta_table.alias("old").merge(
        deduplicated_append_df.alias("new"), condition_columns
    ).whenNotMatchedInsertAll().execute()

constraint_append(delta_table, append_df, quarantine_table)

Parameters:

Name Type Description Default
delta_table DeltaTable

required
append_df DataFrame

required
quarantine_table DeltaTable

required

Raises:

Type Description
TypeError

Raises type error when input arguments have an invalid type.

TypeError

Raises type error when delta_table has no constraints.

Source code in mack/__init__.py
def constraint_append(
    delta_table: DeltaTable, append_df: DataFrame, quarantine_table: DeltaTable
):
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable
    :param append_df: <description>
    :type append_df: DataFrame
    :param quarantine_table: <description>
    :type quarantine_table: DeltaTable

    :raises TypeError: Raises type error when input arguments have an invalid type.
    :raises TypeError: Raises type error when delta_table has no constraints.
    """

    if not isinstance(delta_table, DeltaTable):
        raise TypeError("An existing delta table must be specified for delta_table.")

    if not isinstance(append_df, DataFrame):
        raise TypeError("You must provide a DataFrame that is to be appended.")

    if quarantine_table is not None and not isinstance(quarantine_table, DeltaTable):
        raise TypeError(
            "An existing delta table must be specified for quarantine_table."
        )

    properties = delta_table.detail().select("properties").collect()[0]["properties"]
    check_constraints = [
        v for k, v in properties.items() if k.startswith("delta.constraints")
    ]

    # add null checks
    fields = delta_table.toDF().schema.fields
    null_constraints = [
        f"{field.name} is not null" for field in fields if not field.nullable
    ]

    constraints = check_constraints + null_constraints

    if not constraints:
        raise TypeError("There are no constraints present in the target delta table")

    target_details = delta_table.detail().select("location").collect()[0]
    if quarantine_table:
        quarantine_details = quarantine_table.detail().select("location").collect()[0]
        quarantine_df = append_df.filter(
            "not (" + " and ".join([c for c in constraints]) + ")"
        )
        (
            quarantine_df.write.format("delta")
            .mode("append")
            .option("mergeSchema", "true")
            .save(quarantine_details["location"])
        )

    filtered_df = append_df.filter(" and ".join([c for c in constraints]))
    (
        filtered_df.write.format("delta")
        .mode("append")
        .option("mergeSchema", "true")
        .save(target_details["location"])
    )

copy_table(delta_table, target_path='', target_table='')

Parameters:

Name Type Description Default
delta_table DeltaTable

required
target_path str

, defaults to empty string.

''
target_table str

, defaults to empty string.

''

Raises:

Type Description
TypeError

Raises type error when input arguments have a invalid type, are missing or are empty.

Source code in mack/__init__.py
def copy_table(
    delta_table: DeltaTable, target_path: str = "", target_table: str = ""
) -> None:
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable
    :param target_path: <description>, defaults to empty string.
    :type target_path: str
    :param target_table: <description>, defaults to empty string.
    :type target_table: str

    :raises TypeError: Raises type error when input arguments have a invalid type, are missing or are empty.
    """
    if not isinstance(delta_table, DeltaTable):
        raise TypeError("An existing delta table must be specified.")

    if not target_path and not target_table:
        raise TypeError("Either target_path or target_table must be specified.")

    origin_table = delta_table.toDF()

    details = delta_table.detail().select("partitionColumns", "properties").collect()[0]

    if target_table:
        (
            origin_table.write.format("delta")
            .partitionBy(details["partitionColumns"])
            .options(**details["properties"])
            .saveAsTable(target_table)
        )
    else:
        (
            origin_table.write.format("delta")
            .partitionBy(details["partitionColumns"])
            .options(**details["properties"])
            .save(target_path)
        )

delta_file_sizes(delta_table)

Parameters:

Name Type Description Default
delta_table DeltaTable

required

Returns:

Type Description
Dict[str, int]

Source code in mack/__init__.py
def delta_file_sizes(delta_table: DeltaTable) -> Dict[str, int]:
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable

    :returns: <description>
    :rtype: Dict[str, int]
    """
    details = delta_table.detail().select("numFiles", "sizeInBytes").collect()[0]
    size_in_bytes, number_of_files = details["sizeInBytes"], details["numFiles"]
    average_file_size_in_bytes = round(size_in_bytes / number_of_files, 0)

    return {
        "size_in_bytes": size_in_bytes,
        "number_of_files": number_of_files,
        "average_file_size_in_bytes": average_file_size_in_bytes,
    }

drop_duplicates(delta_table, duplication_columns)

Parameters:

Name Type Description Default
delta_table DeltaTable

required
duplication_columns List[str]

required

Raises:

Type Description
TypeError

Raises type error when input arguments have a invalid type, are missing or are empty.

Source code in mack/__init__.py
def drop_duplicates(delta_table: DeltaTable, duplication_columns: List[str]) -> None:
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable
    :param duplication_columns: <description>
    :type duplication_columns: List[str]

    :raises TypeError: Raises type error when input arguments have a invalid type, are missing or are empty.
    """
    if not isinstance(delta_table, DeltaTable):
        raise TypeError("An existing delta table must be specified.")

    if not duplication_columns or len(duplication_columns) == 0:
        raise TypeError("A duplication column must be specified.")

    data_frame = delta_table.toDF()

    details = delta_table.detail().select("location").collect()[0]

    (
        data_frame.drop_duplicates(duplication_columns)
        .write.format("delta")
        .mode("overwrite")
        .save(details["location"])
    )

drop_duplicates_pkey(delta_table, primary_key, duplication_columns)

Parameters:

Name Type Description Default
delta_table DeltaTable

required
primary_key str

required
duplication_columns List[str]

required

Raises:

Type Description
TypeError

Raises type error when input arguments have a invalid type, are missing or are empty.

TypeError

Raises type error when required columns are missing in the provided delta table.

Source code in mack/__init__.py
def drop_duplicates_pkey(
    delta_table: DeltaTable, primary_key: str, duplication_columns: List[str]
) -> None:
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable
    :param primary_key: <description>
    :type primary_key: str
    :param duplication_columns: <description>
    :type duplication_columns: List[str]

    :raises TypeError: Raises type error when input arguments have a invalid type, are missing or are empty.
    :raises TypeError: Raises type error when required columns are missing in the provided delta table.
    """
    if not isinstance(delta_table, DeltaTable):
        raise TypeError("An existing delta table must be specified.")

    if not primary_key:
        raise TypeError("A unique primary key must be specified.")

    if not duplication_columns or len(duplication_columns) == 0:
        raise TypeError("A duplication column must be specified.")

    if primary_key in duplication_columns:
        raise TypeError("Primary key must not be part of the duplication columns.")

    data_frame = delta_table.toDF()

    # Make sure that all the required columns are present in the provided delta table
    append_data_columns = data_frame.columns
    required_columns = [primary_key] + duplication_columns
    for required_column in required_columns:
        if required_column not in append_data_columns:
            raise TypeError(
                f"The base table has these columns {append_data_columns!r}, but these columns are required {required_columns!r}"
            )

    q = []

    duplicate_records = (
        data_frame.withColumn(
            "row_number",
            row_number().over(
                Window().partitionBy(duplication_columns).orderBy(primary_key)
            ),
        )
        .filter(col("row_number") > 1)
        .drop("row_number")
        .distinct()
    )
    for column in required_columns:
        q.append(f"old.{column} = new.{column}")

    q = " AND ".join(q)

    # Remove all the duplicate records
    delta_table.alias("old").merge(
        duplicate_records.alias("new"), q
    ).whenMatchedDelete().execute()

find_composite_key_candidates(df, exclude_cols=None)

Parameters:

Name Type Description Default
df Union[DeltaTable, DataFrame]

required
exclude_cols List[str]

None

Returns:

Type Description
List

Raises:

Type Description
TypeError

Raises type error when no composite key can be found.

Source code in mack/__init__.py
def find_composite_key_candidates(
    df: Union[DeltaTable, DataFrame], exclude_cols: List[str] = None
) -> List:
    """
    <description>

    :param df: <description>
    :type df: DeltaTable or DataFrame
    :param exclude_cols: <description>
    :type exclude_cols: List[str], defaults to None.

    :raises TypeError: Raises type error when no composite key can be found.

    :returns: <description>
    :rtype: List
    """
    if type(df) == DeltaTable:
        df = df.toDF()
    if exclude_cols is None:
        exclude_cols = []
    df_col_excluded = df.drop(*exclude_cols)
    total_cols = len(df_col_excluded.columns)
    total_row_count = df_col_excluded.distinct().count()
    for n in range(1, len(df_col_excluded.columns) + 1):
        for c in combinations(df_col_excluded.columns, n):
            if df_col_excluded.select(*c).distinct().count() == total_row_count:
                if len(df_col_excluded.select(*c).columns) == total_cols:
                    raise ValueError("No composite key candidates could be identified.")
                return list(df_col_excluded.select(*c).columns)

humanize_bytes(n)

Parameters:

Name Type Description Default
n int

required

Returns:

Type Description
str

Source code in mack/__init__.py
def humanize_bytes(n: int) -> str:
    """
    <description>

    :param n: <description>
    :type n: int

    :returns: <description>
    :rtype: str
    """
    kilobyte = 1000
    for prefix, k in (
        ("PB", kilobyte**5),
        ("TB", kilobyte**4),
        ("GB", kilobyte**3),
        ("MB", kilobyte**2),
        ("kB", kilobyte**1),
    ):
        if n >= k * 0.9:
            return f"{n / k:.2f} {prefix}"
    return f"{n} B"

humanize_bytes_binary(n)

Parameters:

Name Type Description Default
n int

required

Returns:

Type Description
str

Source code in mack/__init__.py
def humanize_bytes_binary(n: int) -> str:
    """
    <description>

    :param n: <description>
    :type n: int

    :returns: <description>
    :rtype: str
    """
    kibibyte = 1024
    for prefix, k in (
        ("PB", kibibyte**5),
        ("TB", kibibyte**4),
        ("GB", kibibyte**3),
        ("MB", kibibyte**2),
        ("kB", kibibyte**1),
    ):
        if n >= k * 0.9:
            return f"{n / k:.2f} {prefix}"
    return f"{n} B"

is_composite_key_candidate(delta_table, cols)

Parameters:

Name Type Description Default
delta_table DeltaTable

required
cols List[str]

required

Returns:

Type Description
bool

Raises:

Type Description
TypeError

Raises type error when input arguments have a invalid type or are missing.

TypeError

Raises type error when required columns are not in dataframe columns.

Source code in mack/__init__.py
def is_composite_key_candidate(delta_table: DeltaTable, cols: List[str]) -> bool:
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable
    :param cols: <description>
    :type cols: List[str]

    :raises TypeError: Raises type error when input arguments have a invalid type or are missing.
    :raises TypeError: Raises type error when required columns are not in dataframe columns.

    :returns: <description>
    :rtype: bool
    """
    if not isinstance(delta_table, DeltaTable):
        raise TypeError("An existing delta table must be specified.")

    if not cols or len(cols) == 0:
        raise TypeError("At least one column must be specified.")

    data_frame = delta_table.toDF()

    for required_column in cols:
        if required_column not in data_frame.columns:
            raise TypeError(
                f"The base table has these columns {data_frame.columns!r}, but these columns are required {cols!r}"
            )

    duplicate_records = (
        data_frame.withColumn(
            "amount_of_records",
            count("*").over(Window.partitionBy(cols)),
        )
        .filter(col("amount_of_records") > 1)
        .drop("amount_of_records")
    )

    if len(duplicate_records.take(1)) == 0:
        return True

    return False

kill_duplicates(delta_table, duplication_columns)

Parameters:

Name Type Description Default
delta_table DeltaTable

required
duplication_columns List[str]

required

Raises:

Type Description
TypeError

Raises type error when input arguments have a invalid type or are empty.

TypeError

Raises type error when required columns are missing in the provided delta table.

Source code in mack/__init__.py
def kill_duplicates(delta_table: DeltaTable, duplication_columns: List[str]) -> None:
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable
    :param duplication_columns: <description>
    :type duplication_columns: List[str]

    :raises TypeError: Raises type error when input arguments have a invalid type or are empty.
    :raises TypeError: Raises type error when required columns are missing in the provided delta table.
    """
    if not isinstance(delta_table, DeltaTable):
        raise TypeError("An existing delta table must be specified.")

    if not duplication_columns or len(duplication_columns) == 0:
        raise TypeError("Duplication columns must be specified")

    data_frame = delta_table.toDF()

    # Make sure that all the required columns are present in the provided delta table
    append_data_columns = data_frame.columns
    for required_column in duplication_columns:
        if required_column not in append_data_columns:
            raise TypeError(
                f"The base table has these columns {append_data_columns!r}, but these columns are required {duplication_columns!r}"
            )

    q = []

    duplicate_records = (
        data_frame.withColumn(
            "amount_of_records",
            count("*").over(Window.partitionBy(duplication_columns)),
        )
        .filter(col("amount_of_records") > 1)
        .drop("amount_of_records")
        .distinct()
    )

    for column in duplication_columns:
        q.append(f"old.{column} = new.{column}")

    q = " AND ".join(q)

    # Remove all the duplicate records
    delta_table.alias("old").merge(
        duplicate_records.alias("new"), q
    ).whenMatchedDelete().execute()

latest_version(delta_table)

Parameters:

Name Type Description Default
delta_table DeltaTable

required

Returns:

Type Description
float

Source code in mack/__init__.py
def latest_version(delta_table: DeltaTable) -> float:
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable

    :returns: <description>
    :rtype: float
    """
    version = delta_table.history().agg(max("version")).collect()[0][0]
    return version

rename_delta_table(delta_table, new_table_name, table_location=None, databricks=False, spark_session=None)

Renames a Delta table to a new name. This function can be used in a Databricks environment or with a standalone Spark session.

Parameters: delta_table (DeltaTable): The DeltaTable object representing the table to be renamed. new_table_name (str): The new name for the table. table_location (str, optional): The file path where the table is stored. Defaults to None. If None, the function will attempt to determine the location from the DeltaTable object. databricks (bool, optional): A flag indicating whether the function is being run in a Databricks environment. Defaults to False. If True, a SparkSession must be provided. spark_session (pyspark.sql.SparkSession, optional): The Spark session. Defaults to None. Required if databricks is set to True.

Returns: None

Raises: TypeError: If the provided delta_table is not a DeltaTable object, or if databricks is True and spark_session is None.

Example Usage:

rename_delta_table(existing_delta_table, "new_table_name")

Source code in mack/__init__.py
def rename_delta_table(
    delta_table: DeltaTable,
    new_table_name: str,
    table_location: str = None,
    databricks: bool = False,
    spark_session: pyspark.sql.SparkSession = None,
) -> None:
    """
    Renames a Delta table to a new name. This function can be used in a Databricks environment or with a
    standalone Spark session.

    Parameters:
    delta_table (DeltaTable): The DeltaTable object representing the table to be renamed.
    new_table_name (str): The new name for the table.
    table_location (str, optional): The file path where the table is stored. Defaults to None.
        If None, the function will attempt to determine the location from the DeltaTable object.
    databricks (bool, optional): A flag indicating whether the function is being run in a Databricks
        environment. Defaults to False. If True, a SparkSession must be provided.
    spark_session (pyspark.sql.SparkSession, optional): The Spark session. Defaults to None.
        Required if `databricks` is set to True.

    Returns:
    None

    Raises:
    TypeError: If the provided `delta_table` is not a DeltaTable object, or if `databricks` is True
        and `spark_session` is None.

    Example Usage:
    >>> rename_delta_table(existing_delta_table, "new_table_name")
    """
    if not isinstance(delta_table, DeltaTable):
        raise TypeError("An existing delta table must be specified for delta_table.")
    if databricks and spark_session is None:
        raise TypeError("A spark session must be specified for databricks.")

    if databricks:
        spark_session.sql(f"ALTER TABLE {delta_table.name} RENAME TO {new_table_name}")
    else:
        delta_table.toDF().write.format("delta").mode("overwrite").saveAsTable(
            new_table_name
        )

show_delta_file_sizes(delta_table, humanize_binary=False)

Parameters:

Name Type Description Default
delta_table DeltaTable

required
humanize_binary bool

False

Returns:

Type Description
None

Source code in mack/__init__.py
def show_delta_file_sizes(
    delta_table: DeltaTable, humanize_binary: bool = False
) -> None:
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable
    :param humanize_binary: <description>
    :type humanize_binary: bool

    :returns: <description>
    :rtype: None
    """
    details = delta_table.detail().select("numFiles", "sizeInBytes").collect()[0]
    size_in_bytes, number_of_files = details["sizeInBytes"], details["numFiles"]
    average_file_size_in_bytes = round(size_in_bytes / number_of_files, 0)

    if humanize_binary:
        humanized_size_in_bytes = humanize_bytes_binary(size_in_bytes)
        humanized_average_file_size = humanize_bytes_binary(average_file_size_in_bytes)
    else:
        humanized_size_in_bytes = humanize_bytes(size_in_bytes)
        humanized_average_file_size = humanize_bytes(average_file_size_in_bytes)
    humanized_number_of_files = f"{number_of_files:,}"

    print(
        f"The delta table contains {humanized_number_of_files} files with a size of {humanized_size_in_bytes}."
        + f" The average file size is {humanized_average_file_size}"
    )

type_2_scd_generic_upsert(delta_table, updates_df, primary_key, attr_col_names, is_current_col_name, effective_time_col_name, end_time_col_name)

Parameters:

Name Type Description Default
delta_table DeltaTable

DeltaTable

required
updates_df DataFrame

required
primary_key str

required
attr_col_names List[str]

required
is_current_col_name str

required
effective_time_col_name str

required
end_time_col_name str

required

Returns:

Type Description
None

Raises:

Type Description
TypeError

Raises type error when required column names are not in the base table.

TypeError

Raises type error when required column names for updates are not in the attributes columns list.

Source code in mack/__init__.py
def type_2_scd_generic_upsert(
    delta_table: DeltaTable,
    updates_df: DataFrame,
    primary_key: str,
    attr_col_names: List[str],
    is_current_col_name: str,
    effective_time_col_name: str,
    end_time_col_name: str,
) -> None:
    """
    <description>

    :param delta_table: DeltaTable
    :type path: str
    :param updates_df: <description>
    :type updates_df: DataFrame
    :param primary_key: <description>
    :type primary_key: str
    :param attr_col_names: <description>
    :type attr_col_names: List[str]
    :param is_current_col_name: <description>
    :type is_current_col_name: str
    :param effective_time_col_name: <description>
    :type effective_time_col_name: str
    :param end_time_col_name: <description>
    :type effective_time_col_name: str

    :raises TypeError: Raises type error when required column names are not in the base table.
    :raises TypeError: Raises type error when required column names for updates are not in the attributes columns list.

    :returns: <description>
    :rtype: None
    """

    # validate the existing Delta table
    base_col_names = delta_table.toDF().columns
    required_base_col_names = (
        [primary_key]
        + attr_col_names
        + [is_current_col_name, effective_time_col_name, end_time_col_name]
    )
    if sorted(base_col_names) != sorted(required_base_col_names):
        raise TypeError(
            f"The base table has these columns {base_col_names!r}, but these columns are required {required_base_col_names!r}"
        )
    # validate the updates DataFrame
    updates_col_names = updates_df.columns
    required_updates_col_names = (
        [primary_key] + attr_col_names + [effective_time_col_name]
    )
    if sorted(updates_col_names) != sorted(required_updates_col_names):
        raise TypeError(
            f"The updates DataFrame has these columns {updates_col_names!r}, but these columns are required {required_updates_col_names!r}"
        )

    # perform the upsert
    updates_attrs = list(
        map(lambda attr: f"updates.{attr} <> base.{attr}", attr_col_names)
    )
    updates_attrs = " OR ".join(updates_attrs)
    staged_updates_attrs = list(
        map(lambda attr: f"staged_updates.{attr} <> base.{attr}", attr_col_names)
    )
    staged_updates_attrs = " OR ".join(staged_updates_attrs)
    staged_part_1 = (
        updates_df.alias("updates")
        .join(delta_table.toDF().alias("base"), primary_key)
        .where(f"base.{is_current_col_name} = true AND ({updates_attrs})")
        .selectExpr("NULL as mergeKey", "updates.*")
    )
    staged_part_2 = updates_df.selectExpr(f"{primary_key} as mergeKey", "*")
    staged_updates = staged_part_1.union(staged_part_2)
    thing = {}
    for attr in attr_col_names:
        thing[attr] = f"staged_updates.{attr}"
    thing2 = {
        primary_key: f"staged_updates.{primary_key}",
        is_current_col_name: "true",
        effective_time_col_name: f"staged_updates.{effective_time_col_name}",
        end_time_col_name: "null",
    }
    res_thing = {**thing, **thing2}
    res = (
        delta_table.alias("base")
        .merge(
            source=staged_updates.alias("staged_updates"),
            condition=pyspark.sql.functions.expr(f"base.{primary_key} = mergeKey"),
        )
        .whenMatchedUpdate(
            condition=f"base.{is_current_col_name} = true AND ({staged_updates_attrs})",
            set={
                is_current_col_name: "false",
                end_time_col_name: f"staged_updates.{effective_time_col_name}",
            },
        )
        .whenNotMatchedInsert(values=res_thing)
        .execute()
    )
    return res

type_2_scd_upsert(delta_table, updates_df, primary_key, attr_col_names)

Parameters:

Name Type Description Default
path DeltaTable

required
updates_df DataFrame

required
primary_key str

required
attr_col_names List[str]

required

Returns:

Type Description
None

Source code in mack/__init__.py
def type_2_scd_upsert(
    delta_table: DeltaTable,
    updates_df: DataFrame,
    primary_key: str,
    attr_col_names: List[str],
) -> None:
    """
    <description>

    :param path: <description>
    :type path: DeltaTable
    :param updates_df: <description>
    :type updates_df: DataFrame
    :param primary_key: <description>
    :type primary_key: str
    :param attr_col_names: <description>
    :type attr_col_names: List[str]

    :returns: <description>
    :rtype: None
    """
    return type_2_scd_generic_upsert(
        delta_table,
        updates_df,
        primary_key,
        attr_col_names,
        "is_current",
        "effective_time",
        "end_time",
    )

validate_append(delta_table, append_df, required_cols, optional_cols)

Parameters:

Name Type Description Default
delta_table DeltaTable

required
append_df DataFrame

required
required_cols List[str]

required
optional_cols List[str]

required

Raises:

Type Description
TypeError

Raises type error when input arguments have a invalid type, are missing or are empty.

TypeError

Raises type error when required columns are missing in the provided delta table.

TypeError

Raises type error when column in append dataframe is not part of the original delta table..

Source code in mack/__init__.py
def validate_append(
    delta_table: DeltaTable,
    append_df: DataFrame,
    required_cols: List[str],
    optional_cols: List[str],
) -> None:
    """
    <description>

    :param delta_table: <description>
    :type delta_table: DeltaTable
    :param append_df: <description>
    :type append_df: DataFrame
    :param required_cols: <description>
    :type required_cols: List[str]
    :param optional_cols: <description>
    :type optional_cols: List[str]

    :raises TypeError: Raises type error when input arguments have a invalid type, are missing or are empty.
    :raises TypeError: Raises type error when required columns are missing in the provided delta table.
    :raises TypeError: Raises type error when column in append dataframe is not part of the original delta table..
    """
    if not isinstance(delta_table, DeltaTable):
        raise TypeError("An existing delta table must be specified.")

    if not isinstance(append_df, DataFrame):
        raise TypeError("You must provide a DataFrame that is to be appended.")

    append_data_columns = append_df.columns

    for required_column in required_cols:
        if required_column not in append_data_columns:
            raise TypeError(
                f"The base Delta table has these columns {append_data_columns!r}, but these columns are required {required_cols!r}"
            )

    table_columns = delta_table.toDF().columns

    for column in append_data_columns:
        if column not in table_columns and column not in optional_cols:
            raise TypeError(
                f"The column {column!r} is not part of the current Delta table."
                + " If you want to add the column to the table you must set the optional_cols parameter."
            )

    details = delta_table.detail().select("location").collect()[0]

    (
        append_df.write.format("delta")
        .mode("append")
        .option("mergeSchema", "true")
        .save(details["location"])
    )

with_md5_cols(df, cols, output_col_name=None)

Parameters:

Name Type Description Default
df Union[DeltaTable, DataFrame]

required
cols List[str]

required
output_col_name Optional[str]

None

Returns:

Type Description
DataFrame

Raises:

Type Description
TypeError

Raises type error when no composite key can be found.

Source code in mack/__init__.py
def with_md5_cols(
    df: Union[DeltaTable, DataFrame],
    cols: List[str],
    output_col_name: Optional[str] = None,
) -> DataFrame:
    """
    <description>

    :param df: <description>
    :type df: DeltaTable or DataFrame
    :param cols: <description>
    :type cols: List[str]
    :param output_col_name: <description>
    :type output_col_name: str, defaults to empty string.

    :raises TypeError: Raises type error when no composite key can be found.

    :returns: <description>
    :rtype: DataFrame
    """
    if output_col_name is None:
        output_col_name = "_".join(["md5"] + cols)
    if type(df) == DeltaTable:
        df = df.toDF()
    return df.withColumn(output_col_name, md5(concat_ws("||", *cols)))