PySpark / Spark Window Function First/ Last Issue

PySpark / Spark Window Function First/ Last Issue



From my understanding first/ last function in Spark will retrieve first / last row of each partition/ I am not able to understand why LAST function is giving incorrect results.



This is my code.


AgeWindow = Window.partitionBy('Dept').orderBy('Age')
df1 = df1.withColumn('first(ID)', first('ID').over(AgeWindow))
.withColumn('last(ID)', last('ID').over(AgeWindow))
df1.show()


+---+----------+---+--------+--------------------------+-------------------------+
|Age| Dept| ID| Name|first(ID) |last(ID) |
+---+----------+---+--------+--------------------------+-------------------------+
| 38| medicine| 4| harry| 4| 4|
| 41| medicine| 5|hermione| 4| 5|
| 55| medicine| 7| gandalf| 4| 7|
| 15|technology| 6| sirius| 6| 6|
| 49|technology| 9| sam| 6| 9|
| 88|technology| 1| sam| 6| 2|
| 88|technology| 2| nik| 6| 2|
| 75| mba| 8| ginny| 8| 11|
| 75| mba| 10| sam| 8| 11|
| 75| mba| 3| ron| 8| 11|
| 75| mba| 11| ron| 8| 11|
+---+----------+---+--------+--------------------------+-------------------------+




1 Answer
1



It is not incorrect. Your window definition is just not what you think it is.



If you provide ORDER BY clause then the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:


ORDER BY


RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW


from pyspark.sql.window import Window
from pyspark.sql.functions import first, last

w = Window.partitionBy('Dept').orderBy('Age')

df = spark.createDataFrame(
[(38, "medicine", 4), (41, "medicine", 5), (55, "medicine", 7)],
("Age", "Dept", "ID")
)

df.select(
"*",
first('ID').over(w).alias("first_id"),
last('ID').over(w).alias("last_id")
).explain()


== Physical Plan ==
Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS first_id#38L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_id#40L], [Dept#23], [Age#22L ASC NULLS FIRST]
+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(Dept#23, 200)
+- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]



This means that the window function never looks ahead and the last row in the frame is the current row.



You should redefine the window as


w_uf = (Window
.partitionBy('Dept')
.orderBy('Age')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

result = df.select(
"*",
first('ID').over(w_uf).alias("first_id"),
last('ID').over(w_uf).alias("last_id")
)


== Physical Plan ==
Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_id#56L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_id#58L], [Dept#23], [Age#22L ASC NULLS FIRST]
+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(Dept#23, 200)
+- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]


result.show()


+---+--------+---+--------+-------+
|Age| Dept| ID|first_id|last_id|
+---+--------+---+--------+-------+
| 38|medicine| 4| 4| 7|
| 41|medicine| 5| 4| 7|
| 55|medicine| 7| 4| 7|
+---+--------+---+--------+-------+






Thanks. It worked. I was trying other functions like lag, lead, cume_dist but they gave error for same window. Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()) must match the required frame specifiedwindowframe(RowFrame, 1, 1);

– Nikhil Redij
Sep 11 '18 at 10:30



Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()) must match the required frame specifiedwindowframe(RowFrame, 1, 1);






Some functions can be applied only with a very specific frame. For example lead and lag are strictly defined by their offset.

– user6910411
Sep 11 '18 at 10:35


lead


lag






It is good to know a solved bug jira.apache.org/jira/browse/SPARK-24033 about window functions in pyspark. Because of the bug you should check your spark version beforehand.

– Vezir
Jan 15 at 8:18



Thanks for contributing an answer to Stack Overflow!



But avoid



To learn more, see our tips on writing great answers.



Required, but never shown



Required, but never shown




By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

𛂒𛀶,𛀽𛀑𛂀𛃧𛂓𛀙𛃆𛃑𛃷𛂟𛁡𛀢𛀟𛁤𛂽𛁕𛁪𛂟𛂯,𛁞𛂧𛀴𛁄𛁠𛁼𛂿𛀤 𛂘,𛁺𛂾𛃭𛃭𛃵𛀺,𛂣𛃍𛂖𛃶 𛀸𛃀𛂖𛁶𛁏𛁚 𛂢𛂞 𛁰𛂆𛀔,𛁸𛀽𛁓𛃋𛂇𛃧𛀧𛃣𛂐𛃇,𛂂𛃻𛃲𛁬𛃞𛀧𛃃𛀅 𛂭𛁠𛁡𛃇𛀷𛃓𛁥,𛁙𛁘𛁞𛃸𛁸𛃣𛁜,𛂛,𛃿,𛁯𛂘𛂌𛃛𛁱𛃌𛂈𛂇 𛁊𛃲,𛀕𛃴𛀜 𛀶𛂆𛀶𛃟𛂉𛀣,𛂐𛁞𛁾 𛁷𛂑𛁳𛂯𛀬𛃅,𛃶𛁼

Edmonton

Crossroads (UK TV series)