In PySpark how to filter dataframe column using unique values from another dataframe


Here is one common task in PySpark: how to filter one dataframe column are from unique values from anther dataframe?

Method 1

Say we have two dataframes df1 and df2, and we want to filter df1 by column called “id”, where its values need to be from
column “id” in df2. If the unique values of column “id” from df2 is not too big, we can do the following:

from pyspark.sql.functions import col

# Create the first DataFrame
df1 = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c"), (4, "d")], ["id", "value"])

# Create the second DataFrame
df2 = spark.createDataFrame([(1, "x"), (2, "y")], ["id", "other_value"])

# Get the unique values of the second DataFrame's column
unique_values = df2.select("id").distinct().rdd.flatMap(lambda x: x).collect()

# Filter the first DataFrame's column based on the unique values
filtered_df1 = df1.filter(col("id").isin(unique_values))

In the above example, filtered_df1 will only contain the rows from df1 where the id column is in the list of unique values from df2’s id column.

Method 2

However the above example using collect might be not optimal for large datasize.
in the example provided, the unique values of the second DataFrame’s column are collected using the .collect() method. This is necessary because the .isin() function that is used to filter the first DataFrame’s column takes in an iterable (e.g. a list, set, or tuple) of values to check against.
However, it’s worth mentioning that this may cause performance issues if the second DataFrame is very large and the unique values are a lot of, it’s better to use the join or subquery method to filter the first DataFrame based on the second Dataframe instead of collecting the unique values.

For example, if you want to filter the first DataFrame based on the second DataFrame’s id column:

filtered_df1 = df1.join(df2, df1.id == df2.id, 'inner').select(df1.columns)
This will give you the same result of the previous example but without collecting the unique values.

Possible error if two dataframe share many columns names that are the same

how about df1 and df2 have column names that are the same? Then you will get error using code above.
But don’t worry, it’s easy to fix. We just need to rename the columns of one othe columns.

If both DataFrames have columns with the same name, you will need to use the alias() function to assign a new name to one of the columns before performing the join.
For example, if both DataFrames have a column named “id”:

from pyspark.sql.functions import col

# Create the first DataFrame
df1 = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c"), (4, "d")], ["id", "value"])

# Create the second DataFrame
df2 = spark.createDataFrame([(1, "x"), (2, "y")], ["id", "other_value"])

# Assign a new name to the second DataFrame's 'id' column
df2 = df2.selectExpr("id as df2_id", "other_value")

# Perform the join
filtered_df1 = df1.join(df2, df1.id == df2.df2_id, 'inner').select(df1.columns)

Replace dataframe columns at once

Another probably quicker method is to replace all the columns at once.
So in pyspark how to rename all of the dataframe columns by adding a prefix?

In PySpark, you can use the selectExpr() function along with a list of string expressions to rename all of the DataFrame’s columns by adding a prefix.

Here is an example of how you can add a prefix “prefix_” to all of the columns in a DataFrame:

from pyspark.sql.functions import col

# Create a DataFrame
df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c"), (4, "d")], ["id", "value"])

# Get the current column names
old_columns = df.columns

# Create a list of string expressions to rename the columns
new_columns = ["prefix_" + col for col in old_columns]

# Use the selectExpr() function to rename the columns
df = df.selectExpr(*[f"{old} as {new}" for old, new in zip(old_columns, new_columns)])

In this example, the selectExpr() function is used to rename all of the columns by adding the prefix “prefix_” to the original column name. It’s done by creating a list of string expressions that include the original column name and the new column name with the prefix “prefix_” using the list comprehension.

Also, you can use the withColumnRenamed() method to rename all columns one by one.

for col in old_columns:
df = df.withColumnRenamed(col, f"prefix_{col}")

This method will rename all the columns one by one by passing the original column name and the new column name with the prefix “prefix_”

how to deep copy datafame in pyspark

So if you don’t want to change the orignal columns of the dataframe, just operate on a copy.
In PySpark, you can create a deep copy of a DataFrame by using the .copy() method or by creating a new DataFrame from the original DataFrame’s data.

Here is an example of how you can create a deep copy of a DataFrame using the .copy() method:

# Create the original DataFrame
df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c"), (4, "d")], ["id", "value"])

# Create a deep copy of the original DataFrame
df_copy = df.copy()

In this example, the df_copy variable will contain a deep copy of the original DataFrame, which is independent of the original DataFrame and any modifications made to it will not affect the original DataFrame.

Please note that .copy() method is not available in all versions of PySpark, so you can use the second method to create a deep copy of DataFrame.
So another way to create a deep copy of a DataFrame is by creating a new DataFrame from the original DataFrame’s data.

# Create a new DataFrame from the original DataFrame's data
df_copy = spark.createDataFrame(df.rdd, df.schema)

Author: robot learner
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source robot learner !
  TOC