I've a table with (millions of) entries along the lines of the following example read into a Spark dataframe (sdf):
| Id | C1 | C2 |
|---|---|---|
| xx1 | c118 | c219 |
| xx1 | c113 | c218 |
| xx1 | c118 | c214 |
| acb | c121 | c201 |
| e3d | c181 | c221 |
| e3d | c132 | c252 |
| abq | c141 | c290 |
| ... | ... | ... |
| vy1 | c13023 | C23021 |
I'd like to get a smaller subset of these Id's for further processing. I identify the unique set of Id's in the table using sdf_id = sdf.select("Id").dropDuplicates().
What is the efficient way from here to filter data (C1, C2) related to, let's say, 100 randomly selected Id's?
CodePudding user response:
Since you already have the list of unique ids , you can further sample it to your desired fraction and filter based on that
There are other ways you can sample random ids , which can be found here
Sampling
### Assuming the DF is 1 mil records , 100 records would be 0.01%
sdf_id = sdf.select("Id").dropDuplicates().sample(0.01).collect()
Filter
sdf_filtered = sdf.filter(F.col('Id').isin(sdf_id))
CodePudding user response:
There are several ways to achieve what you want.
My sample data
df = spark.createDataFrame([
(1, 'a'),
(1, 'b'),
(1, 'c'),
(2, 'd'),
(2, 'e'),
(3, 'f'),
], ['id', 'col'])
The initial step is getting the sample IDs that you wanted
ids = df.select('id').distinct().sample(0.2) # 2 is 20%, you can adjust this
---
| id|
---
| 1|
---
Approach #1: using inner join
Since you have two dataframes, you can just perform a single inner join to get all records from df for each id in ids. Note that F.broadcast is to boost up the performance because ids suppose to be small enough. Feel free to take it away if you want to. Performance-wise, this approach is preferred.
df.join(F.broadcast(ids), on=['id'], how='inner').show()
--- ---
| id|col|
--- ---
| 1| a|
| 1| b|
| 1| c|
--- ---
Approach #2: using isin
You can't simply get the list of IDs via ids.collect(), because that would return a list of Row, you have to loop through it to get the exact column that you want (id in this case).
df.where(F.col('id').isin([r['id'] for r in ids.collect()])).show()
--- ---
| id|col|
--- ---
| 1| a|
| 1| b|
| 1| c|
--- ---
