I have a function that adds a column to a DataFrame using a function, for eg
def myfunc(x):
resp_data = {'status': '1', 'data': x}
return json.dumps(resp_data)
The original Pandas dataframe df is converted into Dask DataFrame as follows
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=30)
Now I call the function myfunc on ddf to add new column data_json using existing column att as follows
ddf['data_json'] = ddf.apply(lambda row:myfunc(row['att']),
axis=1, result_type='expand', meta=(None, 'str'))
When I call ddf.compute() it breaks with this error
AttributeError: 'Series' object has no attribute 'columns'
I need to save ddf to a file after calling compute() using
ddf.to_csv("myfile.csv", index=False, single_file=True)
How can I handle the error to skip those rows where this is generated and continue to process and save the Dask dataframe?
CodePudding user response:
A few suggestions:
if your function is simple, then it is not necessary to pass the series as an argument, so something like
ddf.apply(myfunc, axis=1)should work. If the function takes multiple arguments, then content of the function should specify how to handle multiple columns.turns out
jsondoesn't likenumpydtypes, so before dumping the value needs to be converted usingint.if the dataframe is saved to csv, then there is no neeed to
.computeit before, as it will involve doing same work twice.if
myfuncdoes not depend on the neighbouring rows, one could also use.map_partitions.
import json
import dask.dataframe as dd
import pandas as pd
ddf = dd.from_pandas(pd.DataFrame(range(5), columns=["x"]), npartitions=2)
ddf["y"] = 2 * ddf["x"]
def myfunc(row):
"""content of the function should specify how to handle different columns"""
resp_data = {
"status": "1",
"y": int(row["y"]),
"diff_data": int(row["y"] - row["x"]),
}
return json.dumps(resp_data)
ddf["data_json"] = ddf.apply(myfunc, axis=1, result_type="expand", meta=(None, "str"))
print(ddf.compute())
# x y data_json
# 0 0 0 {"status": "1", "y": 0, "diff_data": 0}
# 1 1 2 {"status": "1", "y": 2, "diff_data": 1}
# 2 2 4 {"status": "1", "y": 4, "diff_data": 2}
# 3 3 6 {"status": "1", "y": 6, "diff_data": 3}
# 4 4 8 {"status": "1", "y": 8, "diff_data": 4}
# if the dataframe only needs to be saved, there is no need for separate .compute
# ddf.to_csv("myfile.csv", index=False, single_file=True)
