A Big Data Hadoop and Spark Project for Absolute Beginners
Learn on Udemy
Loading a csv file and capturing all the bad records is a very common requirement in ETL projects. Most of the relational database loaders like sql loader or nzload provides this feature but when it comes to Hadoop and Spark (2.2.0) there is no direct solution for this.
However solution to this problem is present in spark Databricks Runtime 3.0 where you just need to provide the bad record path and all the bad record file will get saved there.
df = spark.read
.option("badRecordsPath", "/data/badRecPath")
.parquet("/input/parquetFile")
However, in the previous spark releases this method doesnt work. We can achieve this in two ways :-
- Read the input file as RDD and then use the RDD transformation methods to filter the bad records
- Use spark.read.csv()
Click here to checkout our Udemy course to learn Spark Scala Coding Framework and BestPractices
In this article we will see how we can capture bad records through spark.read.csv(). In order to load a file and capture bad records we need to perform the following steps:-
- Create schema (StructType) for the input file to load with an extra column of string type(say bad_records) for corrupt records.
- Call method spark.read.csv() with all the required parameters and pass the bad record column name (extra column created in step 1 as parameter columnNameOfCorruptRecord.
- Filter all the records where “bad_records” is not null and save it as a temp file.
- Read the temporary file as csv (spark.read.csv) and pass the same schema as above(step 1)
- From the bad data-frame Select “bad_column”.
Step 5 will give you a data-frame having all the bad records.
Code:-
>>> >>> >>>
#####################Create Schema#####################################
>>> customSchema = StructType( [
StructField("order_number", IntegerType(), True),
StructField("total", StringType(), True),\
StructField("bad_record", StringType(), True)\
]
)
“bad_record” here is the bad records column.
#################Call spark.read.csv()####################
>>> orders_df = spark.read \
... .format('com.databricks.spark.csv') \
... .option("badRecordsPath", "/test/data/bad/")\
.option("mode","PERMISSIVE")\
... ... .option("columnNameOfCorruptRecord", "bad_record")\
... .options(header='false', delimiter='|',) \
.load('/test/data/test.csv',schema = customSchema)...
After calling spark.read.csv, If a record doesn’t satisfy the schema then null will be assigned to all the column and a concatenated value of all columns will be assigned to the bad records column.
>>> orders_df.show()
+-------------------+-------------------+-----------------------------+-----------------------------------------
|order_number| total | bad_record|
+-------------------+-------------------+-----------------------------+----------------------------------------
| 1| 1000| null|
| 2| 4000| null|
| null| null| A|30|3000|
NOTE:-
Corrupt record columns are generated at run time when DataFrames instantiated and data is actually fetched (by calling any action).
Output of corrupt column depends on other columns which are a part of RDD in that particular ACTION call.
If error causing column is not a part of the ACTION call then bad_column wont show any bad record.
If you want to overcome this issue and want the bad_record to persist then follow step 3,4 and 5 or use caching.
Corrupt record columns are generated at run time when DataFrames instantiated and data is actually fetched (by calling any action).
Output of corrupt column depends on other columns which are a part of RDD in that particular ACTION call.
If error causing column is not a part of the ACTION call then bad_column wont show any bad record.
If you want to overcome this issue and want the bad_record to persist then follow step 3,4 and 5 or use caching.
Click here to checkout our Udemy course to learn more about Spark Scala Coding Framework and BestPractices