Home > Mobile >  Is spark.read.load() an action or transformation? It takes time with this statement alone
Is spark.read.load() an action or transformation? It takes time with this statement alone

Time:02-04

I tried just loading the data using the below code and it looks like, without any other action on this, it is taking a lot of time. Bigger the file size is, more the time it takes.

print("STARTED") biglog_df = spark.read.format("csv").option("header",True).option("inferSchema",True)
.option("path","bigLog.txt").load()

print("DONE STARTING")

It took around 20 Secs to print "DONE STARTING" when file is of 4GB, while it took more than a minute to get to "DONE STARTING" when file size is 25GB. Does this mean that Spark is trying to load the data ? So is load an action?

CodePudding user response:

The load operation is not lazy evaluated if you set the inferSchema option to True. In this case, spark will launch a job to scan the file and infer the type of columns.

You can avoid this behavior by informing the schema while reading the file.

You can observe this behavior with this test:

  1. Open a new interactive session in pyspark;
  2. Open Spark UI > Pyspark Session > Jobs

And Run:

df = (
  spark.read.format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .option("path", "s3a://first-street-climate-risk-statistics-for-noncommercial-use/01_DATA/Climate_Risk_Statistics/v1.3/Zip_level_risk_FEMA_FSF_v1.3.csv")
  .load()
)

You will notice that jobs will be launched to scan (part of) the file to infer the schema.

If you load the file informing the schema:

import json
from pyspark.sql.types import StructType

json_schema = '{"fields":[{"metadata":{},"name":"zipcode","nullable":true,"type":"integer"},{"metadata":{},"name":"count_property","nullable":true,"type":"integer"},{"metadata":{},"name":"count_fema_sfha","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fema_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_5","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_5","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_5","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_5","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_100","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_100","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_100","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_100","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_500","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_500","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_500","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_500","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_fema_difference_2020","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_fema_difference_2020","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_all","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_2_10","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_fsf_2020_100","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_fsf_2020_500","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_no_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"count_floodfactor1","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor2","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor3","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor4","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor5","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor6","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor7","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor8","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor9","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor10","nullable":true,"type":"integer"}],"type":"struct"}'

schema = StructType.fromJson(json.loads(json_schema))

df = (
  spark.read.format("csv")
  .schema(schema)
  .option("header", True)
  .option("path", "s3a://first-street-climate-risk-statistics-for-noncommercial-use/01_DATA/Climate_Risk_Statistics/v1.3/Zip_level_risk_FEMA_FSF_v1.3.csv")
  .load()
)

Spark will launch no jobs as the schema details will already be available in the catalog.

I hope it helps. See you.

CodePudding user response:

As already explained by @rodrigo,

the csv option inferSchema imply a pass over the whole csv file to infer the schema.

You can change the behavior providing the schema by yourself (if you want to create it by hand, maybe with a case class if you are on scala) or by using the samplingRatio option that indicate how much of your file you want to scan, in order to have faster operations while setting up your dataframe.

All the interesting behavior are explained in the documentation, that you can find here: Dataframe reader documentation with options for csv file reading

biglog_df = 
spark.read.format("csv")
.option("header",True)
.option("inferSchema",True)
   .option("samplingRatio", 0.01)
.option("path","bigLog.txt").load()

BRegards

  •  Tags:  
  • Related