AI-Based Data Transformation: A Comparison of LLM-Generated PySpark Code (Using GPT-4)
Welcome to the first installment of our series comparing data transformation codes generated by various large language models (LLMs). In this series, we aim to explore how different AI models approach data engineering and analytical tasks under identical conditions.
Introduction to the Series
This series will provide insights into the capabilities and nuances of coding solutions provided by four major LLMs: ChatGPT, Google Gemini, Mistral, Llama and Claude. By evaluating the code generated by each model for the same data use case, we can better understand their efficiency, readability, and innovative approaches.
For Series 1.1, we're focusing on PySpark code generation using GPT-4 model, a popular tool for handling big data.
Preselected Data and Use Case
We will use ERCOT's "2-Day Ancillary Services Reports" for our comparison. This dataset comprises various reports including:
NP3-959-EX: 48-hour Aggregate AS Offers
NP3-960-EX: 48-hour Self-Arranged AS
NP3-961-EX: 48-hour Cleared DAM AS
The data is collected daily and compiled in a zipped file containing multiple CSVs for the month of April. The task involves ingesting this data into SQL database using PySpark to transform and analyze the data, focusing on operations like aggregating and summarizing ancillary service offers and transactions.
Prompt Used
We have a folder with Ercot's "2-Day Ancillary Services Reports" This report contains all 48 Hour disclosure data related to DAM. The following individual files are included in the report:NP3-959-EX 48-hour Aggregate AS Offers; NP3-960-EX 48-hour Self-Arranged AS; NP3-961-EX 48-hour Cleared DAM AS (previously named 48 Hour Ancillary Services Reports).
We gathered daily data as a zipped file containing multiple CSVs inside the Zipped file for the month of April. I have added all the zipped files in a master folder. So now there is a Master folder with a 30 days zipped file. Each zipped file has multiple CSV with data listed above.
Can you give me step by step procedure to do the following:
1) Ingest the CSV file within zipped folders using PySpark
2) filter outNP3-960-EX 48-hour Self-Arranged AS
andNP3-961-EX 48-hour Cleared DAM AS
3) Ingest the file in the MS SQL database that I recently set up on my Azure.
ChatGPT's PySpark Answer
1. Ingest the CSV File within Zipped Folders Using PySpark
To begin, ensure that your Azure Databricks or any Spark environment is set up and configured to access your Azure storage where the zipped files are stored.
Step 1.1: Initialize Spark Session
First, set up your Spark session. If you're using Azure Databricks, you can connect it directly to Azure Synapse Analytics.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Ercot Data Ingestion") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.2.0,com.microsoft.azure:azure-storage:8.6.6") \
.getOrCreate()
Step 1.2: Read Zipped Files
Spark can read zipped files directly by specifying the option to read them as binary files, and then you can extract the content.
from pyspark.sql.functions import input_file_name, expr
import zipfile
import io
# Path to the folder containing zipped files
zip_files_path = "path_to_your_container/folder_with_zip_files/*.zip"
# Read the zipped files as binary files
binary_files = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.zip") \
.load(zip_files_path) \
.select("path", "content")
def unzip_files(content):
with zipfile.ZipFile(io.BytesIO(content)) as z:
return {name: z.read(name) for name in z.namelist()}
# UDF to extract files from zip
unzip_udf = udf(unzip_files, MapType(StringType(), BinaryType()))
# Apply the UDF to extract CSVs from zips
extracted_files = binary_files.withColumn("files", unzip_udf("content"))
Step 1.3: Flatten and Read CSVs
After extracting, you need to flatten the structure and read the CSV files.
from pyspark.sql.functions import explode
# Flatten the files and select CSV only
flat_files = extracted_files.select(explode("files").alias("filename", "content")) \
.filter(col("filename").endsWith(".csv"))
# Convert binary content to string and read CSV
def binary_to_string(binary):
return binary.decode("utf-8")
binary_to_string_udf = udf(binary_to_string, StringType())
csv_data = flat_files.withColumn("csv_content", binary_to_string_udf("content"))
# Optionally, convert CSV content to DataFrame
# This step might need adjustment based on CSV structure
# import pandas as pd
# from pyspark.sql.types import *
# schema = StructType([...]) # Define schema if needed
# def read_csv(content):
# pdf = pd.read_csv(io.StringIO(content))
# return spark.createDataFrame(pdf, schema=schema)
# csv_df = csv_data.withColumn("df", read_csv("csv_content"))
2. Filter Out Specific Files
Step 2.1: Filter Specific Data Files
You can filter out specific files based on their names.
filtered_df = csv_data.filter(
(col("filename") == "NP3-960-EX 48-hour Self-Arranged AS.csv") |
(col("filename") == "NP3-961-EX 48-hour Cleared DAM AS.csv")
)
3. Ingest the File in SQL Database in Azure Synapse Analytics
Step 3.1: Write to Azure Synapse
Finally, write the DataFrame to your SQL database in Azure Synapse Analytics.
server_name = "your_server_name.database.windows.net"
database_name = "your_database_name"
url = f"jdbc:sqlserver://{server_name};databaseName={database_name}"
table_name = "your_table_name"
properties = {
"user": "your_username",
"password": "your_password",
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
filtered_df.write.jdbc(url=url, table=table_name, mode="append", properties=properties)
This will append the filtered data to your specified table in the Azure Synapse Analytics SQL database.
Make sure to adjust paths, schema, and authentication details according to your environment and security standards.
Implementation Results
No issues on the Spark File Initialization step
I got the following issue with Read Zipped Files step
NameError: name 'udf' is not defined
- I fixed this error by using specific import syntax
from pyspark.sql.functions import udf
- I fixed this error by using specific import syntax
NameError: name 'MapType' is not defined
I fixed this error by adding following specific import syntaxes as recommended by GPT-4:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import StringType, BinaryType, MapType
I encountered following issue with Flatten and Read CSVs step
TypeError: 'Column' object is not callable
GPT-4 suggested I remove this syntax
# Flatten the files and select CSV only flat_files = extracted_files.select(explode("files").alias("filename", "content")) \ .filter(col("filename").endsWith(".csv"))
And replace that with the following after which I was able to run flatten and read CSV step.
# Explode the map of files to rows flat_files = extracted_files.select(explode("files").alias("filename", "content")) # Filter to include only CSV files if necessary csv_files = flat_files.filter(col("filename").endswith(".csv"))
No issue on the Filter Specific Data Files step.
I got a
Py4JJavaError: An error occurred while calling o91.jdbc.
on Ingestion to SQL Database step. However, this was my mistake because I forgot to create a table in SQL DB on Azure.After creating table, I was able to ingest the transformed file without any issues.
Conclusion on GPT-4's Performance
GPT-4's approach to generating PySpark code for the ERCOT data transformation task showcased a blend of effectiveness and areas needing adjustment. Initially, the code handled the Spark File Initialization step seamlessly, indicating a strong foundational coding structure. However, several issues emerged during the Read Zipped Files and Flatten and Read CSVs steps, which required additional interventions:
mport Statements: Errors such as
NameError
highlighted the necessity for explicit import statements for functions likeudf
and types likeMapType
, which were not included in the initial code. These were rectified by incorporating the correct import syntaxes, demonstrating the importance of comprehensive context awareness in code generation.File Handling: The initial method to flatten and read CSV files led to a
TypeError
, which was resolved by modifying the approach to file extraction and filtering. This correction improved the code's functionality, allowing the subsequent data processing steps to proceed without further issues.Database Ingestion: An error during the SQL Database ingestion was due to a setup oversight rather than a code issue, highlighting the importance of environment readiness in data workflows.
Overall, GPT-4 effectively generated a foundational script but required some manual adjustments to address specific Python and PySpark nuances. These tweaks were essential for achieving a fully functional pipeline, underscoring the need for user oversight and intervention when working with AI-generated code.
Looking Forward
In future installments of this blog series, we will continue to explore how different LLMs, including Llama-3-70B, handle the same PySpark data transformation task under identical conditions. This will allow us to comprehensively compare the capabilities, efficiencies, and innovations brought by each model. Stay tuned for our next analysis where we'll delve into Llama-3-70B's approach to transforming ERCOT's data using PySpark, providing further insights into the evolving landscape of AI-driven data engineering.
About Us: Turboline empowers data-driven decisions with cutting-edge data engineering and AI solutions. For more information on us, visit www.turboline.ai.