I have the following prelude code that is shared between my two scenarios:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [22.0, 88.0, np.nan]})
Now, I would like to convert df into a pyspark dataframe (sdf). When I try to "cast" "col2" implicitly into LongType via a schema during the creation of sdf it fails:
schema = StructType([StructField("col1", LongType()), StructField("col2", LongType())])
sdf = spark.createDataFrame(df[schema.fieldNames()], schema=schema)
Error:
TypeError: field col2: LongType can not accept object 22.0 in type <class 'float'>
But If I do the following it works just fine:
schema_2 = StructType(
[StructField("col1", LongType()), StructField("col2", FloatType())]
)
sdf = spark.createDataFrame(df[schema.fieldNames()], schema=schema_2)
cast_sdf = sdf.withColumn("col2", F.col("col2").cast(LongType()))
cast_sdf.show()
with the output:
---- ----
|col1|col2|
---- ----
| 1| 22|
| 2| 88|
| 3| 0|
---- ----
CodePudding user response:
Transforming my comment into answer.
This is actually how Spark works with schemas. It is not specific to pandas dataframe being converted into pyspark dataframe. You'll get the same error when using createDataframe method with list of tuples:
import numpy as np
schema = StructType([StructField("col1", LongType()), StructField("col2", LongType())])
df = spark.createDataFrame([(1, 22.0), (2, 88.0), (3, np.nan)], schema)
# TypeError: field col2: LongType can not accept object 22.0 in type <class 'float'>
This is also the behavior with DataSources like CSV when you pass schema (although when reading CSV it does not fail with mode PERMISSIVE but values are loaded as null). Because the schema does not make automatic casting of types, it just tells Spark which datatype should be there for each column in rows.
So when using schema, you have to pass data that matches the specified types or use StringType which does not fail, then use explicit casting to convert your columns into desired types.
schema = StructType([StructField("col1", LongType()), StructField("col2", StringType())])
df = spark.createDataFrame([(1, 22.0), (2, 88.0), (3, np.nan)], schema)
df = df.withColumn("col2", F.col("col2").cast("long"))
df.show()
# ---- ----
#|col1|col2|
# ---- ----
#| 1| 22|
#| 2| 88|
#| 3|null|
# ---- ----
CodePudding user response:
I dont see how your operation would succeed. From my little knowledge, Double is a high level precision float. Long is a huge number Integer.
Where nans are mixed with integer or numeric floats, I would coerce it to integer from pandas and impose a LongType() on conversion to spark dataframe. Because the two go together.
Otherwise, I would impose float/double on conversion to spark dataframe like you did and then cast to LongType(). Why? Because float is a relative of double
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [22.0, 88.0, np.nan]}).assign(col2=df['col2'].fillna(0).astype(int))
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,LongType,DoubleType
mySchema = StructType([ StructField("col1", StringType(), True)\
,StructField("col2", LongType(), True)])
sdf=spark.createDataFrame(df,schema=mySchema)
sdf.show()
---- ----
|col1|col2|
---- ----
| 1| 22|
| 2| 88|
| 3| 0|
---- ----
