PySpark / Spark Window Function First/ Last Issue
PySpark / Spark Window Function First/ Last Issue
From my understanding first/ last function in Spark will retrieve first / last row of each partition/ I am not able to understand why LAST function is giving incorrect results.
This is my code.
AgeWindow = Window.partitionBy('Dept').orderBy('Age')
df1 = df1.withColumn('first(ID)', first('ID').over(AgeWindow))
.withColumn('last(ID)', last('ID').over(AgeWindow))
df1.show()
+---+----------+---+--------+--------------------------+-------------------------+
|Age| Dept| ID| Name|first(ID) |last(ID) |
+---+----------+---+--------+--------------------------+-------------------------+
| 38| medicine| 4| harry| 4| 4|
| 41| medicine| 5|hermione| 4| 5|
| 55| medicine| 7| gandalf| 4| 7|
| 15|technology| 6| sirius| 6| 6|
| 49|technology| 9| sam| 6| 9|
| 88|technology| 1| sam| 6| 2|
| 88|technology| 2| nik| 6| 2|
| 75| mba| 8| ginny| 8| 11|
| 75| mba| 10| sam| 8| 11|
| 75| mba| 3| ron| 8| 11|
| 75| mba| 11| ron| 8| 11|
+---+----------+---+--------+--------------------------+-------------------------+
1 Answer
1
It is not incorrect. Your window definition is just not what you think it is.
If you provide ORDER BY
clause then the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
:
ORDER BY
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
from pyspark.sql.window import Window
from pyspark.sql.functions import first, last
w = Window.partitionBy('Dept').orderBy('Age')
df = spark.createDataFrame(
[(38, "medicine", 4), (41, "medicine", 5), (55, "medicine", 7)],
("Age", "Dept", "ID")
)
df.select(
"*",
first('ID').over(w).alias("first_id"),
last('ID').over(w).alias("last_id")
).explain()
== Physical Plan ==
Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS first_id#38L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_id#40L], [Dept#23], [Age#22L ASC NULLS FIRST]
+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(Dept#23, 200)
+- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]
This means that the window function never looks ahead and the last row in the frame is the current row.
You should redefine the window as
w_uf = (Window
.partitionBy('Dept')
.orderBy('Age')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
result = df.select(
"*",
first('ID').over(w_uf).alias("first_id"),
last('ID').over(w_uf).alias("last_id")
)
== Physical Plan ==
Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_id#56L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_id#58L], [Dept#23], [Age#22L ASC NULLS FIRST]
+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(Dept#23, 200)
+- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]
result.show()
+---+--------+---+--------+-------+
|Age| Dept| ID|first_id|last_id|
+---+--------+---+--------+-------+
| 38|medicine| 4| 4| 7|
| 41|medicine| 5| 4| 7|
| 55|medicine| 7| 4| 7|
+---+--------+---+--------+-------+
Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()) must match the required frame specifiedwindowframe(RowFrame, 1, 1);
Some functions can be applied only with a very specific frame. For example
lead
and lag
are strictly defined by their offset.– user6910411
Sep 11 '18 at 10:35
lead
lag
It is good to know a solved bug jira.apache.org/jira/browse/SPARK-24033 about window functions in pyspark. Because of the bug you should check your spark version beforehand.
– Vezir
Jan 15 at 8:18
Thanks for contributing an answer to Stack Overflow!
But avoid …
To learn more, see our tips on writing great answers.
Required, but never shown
Required, but never shown
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
Thanks. It worked. I was trying other functions like lag, lead, cume_dist but they gave error for same window.
Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()) must match the required frame specifiedwindowframe(RowFrame, 1, 1);
– Nikhil Redij
Sep 11 '18 at 10:30