How do I add a column to a nested struct in a pyspark dataframe?
How do I add a column to a nested struct in a pyspark dataframe?
I have a dataframe with a schema like
root
|-- state: struct (nullable = true)
| |-- fld: integer (nullable = true)
and I'd like to add columns within the state struct, that is create a dataframe with a schema like
state
root
|-- state: struct (nullable = true)
| |-- fld: integer (nullable = true)
| |-- a: integer (nullable = true)
But instead I'm getting
root
|-- state: struct (nullable = true)
| |-- fld: integer (nullable = true)
|-- state.a: integer (nullable = true)
This is from trying
df.withColumn('state.a', val)
3 Answers
3
Here is a way to do it without using a udf:
udf
# create example dataframe
import pyspark.sql.functions as f
data = [
('fld': 0,)
]
schema = StructType(
[
StructField('state',
StructType(
[StructField('fld', IntegerType())]
)
)
]
)
df = sqlCtx.createDataFrame(data, schema)
df.printSchema()
#root
# |-- state: struct (nullable = true)
# | |-- fld: integer (nullable = true)
Now use withColumn() and add the new field using lit() and alias().
withColumn()
lit()
alias()
val = 1
df_new = df.withColumn(
'state',
f.struct(*[f.col('state')['fld'].alias('fld'), f.lit(val).alias('a')])
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# | |-- fld: integer (nullable = true)
# | |-- a: integer (nullable = false)
If you have a lot of fields in the nested struct you can use a list comprehension, using df.schema["state"].dataType.names to get the field names. For example:
df.schema["state"].dataType.names
val = 1
s_fields = df.schema["state"].dataType.names # ['fld']
df_new = df.withColumn(
'state',
f.struct(*([f.col('state')[c].alias(c) for c in s_fields] + [f.lit(val).alias('a')]))
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# | |-- fld: integer (nullable = true)
# | |-- a: integer (nullable = false)
References
I see, use
withColumn to replace the struct with a new struct, so copy over the old fields. This works, thanks! I wonder if there is a way to add field to the struct, without having to name all the existing sub fields?– MrCartoonology
Feb 14 '18 at 17:12
withColumn
struct
@MrCartoonology I found a cleaner way to get the field names. See the update.
– pault
Feb 15 '18 at 14:55
Thanks @pault, your answer really helped me :)
– Ajit K'sagar
Jun 12 '18 at 11:11
Although this is a too late answer, for pyspark version 2.x.x following is supported.
Assuming dfOld already contains state and fld as asked in question.
dfOld
state
fld
dfOld.withColumn("a","value")
dfNew = dfOld.select("level1Field1", "level1Field2", struct(col("state.fld").alias("fld"), col("a")).alias("state"))
dfOld.withColumn("a","value")
dfNew = dfOld.select("level1Field1", "level1Field2", struct(col("state.fld").alias("fld"), col("a")).alias("state"))
Reference: https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803
from pyspark.sql.functions import *
from pyspark.sql.types import *
def add_field_in_dataframe(nfield, df, dt):
fields = nfield.split(".")
print fields
n = len(fields)
addField = fields[0]
if n == 1:
return df.withColumn(addField, lit(None).cast(dt))
nestedField = ".".join(fields[:-1])
sfields = df.select(nestedField).schema[fields[-2]].dataType.names
print sfields
ac = col(nestedField)
if n == 2:
nc = struct(*( [ac[c].alias(c) for c in sfields] + [lit(None).cast(dt).alias(fields[-1])]))
else:
nc = struct(*( [ac[c].alias(c) for c in sfields] + [lit(None).cast(dt).alias(fields[-1])])).alias(fields[-2])
print nc
n = n - 1
while n > 1:
print "n: ",n
fields = fields[:-1]
print "fields: ", fields
nestedField = ".".join(fields[:-1])
print "nestedField: ", nestedField
sfields = df.select(nestedField).schema[fields[-2]].dataType.names
print fields[-1]
print "sfields: ", sfields
sfields = [s for s in sfields if s != fields[-1]]
print "sfields: ", sfields
ac = col(".".join(fields[:-1]))
if n > 2:
print fields[-2]
nc = struct(*( [ac[c].alias(c) for c in sfields] + [nc])).alias(fields[-2])
else:
nc = struct(*( [ac[c].alias(c) for c in sfields] + [nc]))
n = n - 1
return df.withColumn(addField, nc)
Thanks for contributing an answer to Stack Overflow!
But avoid …
To learn more, see our tips on writing great answers.
Required, but never shown
Required, but never shown
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
You can create a new column using a udf with the schema you desire and drop the old one. As far as I know, you can't change the schema of struct column. see this question
– ashwinids
Feb 14 '18 at 3:07