I ended up learning about how the spark catalyst optimizer performs a so called predicate pushdown, running into this weird issue, simplifying my example below.

The behavior was encountered when a df has “left_id”, “free_form”

and, where df_ref contains “right_id”, “location”

and the column “free_form” contains yyyyMMdd dates only when “location” is “12345” and so thats why a join is performed,

start_date = '2024-01-01'
end_date = '2025-12-31'

(df.join(df_ref, df.left_id = df_ref.right_id, "left")
    .filter(f.col("location" ) == "12345").withColumn (
"foo_date", f.to date("free_form", "ууууMMdd")
)
.where(f.col("foo_date").between(start_date, end_date))
.display())

But mysteriously this crashes when the final start_date, end_date filter is done with

SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION. PARSE_DATETIME_BY_NEW_PARSER You may get a different result due to the upgrade caused by: DateTimeParsexception: Text '1234567890' could not be parsed at index 0

, even though I have double and triple checked and 1234567890 is only in “free_form”, before filtering by f.col("location") = "1234"

it’s almost like the operations are being done out of order.

And after some resezrch, indeed I learned that the catalyst optimizer will often put filter expressions as close to a data read as possible, as long as the outcome would be unaffected.

Indeed that was likely the case here.

Finally what worked

I ended up adding a condition to the date parsing, which would prevent the parse error on values that are not valid date strings.

start_date = '2024-01-01'
end_date = '2025-12-31'

(df.join(
    df_ref, df.left_id = df_ref.right_id, "left")
    .filter(f.col("location" ) == "12345")
    .withColumn(
        "foo_date", f.when(
            f.col("free_form").rlike(r'^\d{8}$'),
            f.to_date("free_form", "ууууMMdd")
        )
    )
    .where(f.col("foo_date").between(start_date, end_date))
    .display()
)

I also did not choose to trust the outcome, I instead, ran a materialized result and compared it to the above , using .subtract() and found them to be equivalent, yet the non-materialized one was x27 times faster!

From the catalyst optimizer notes

image

references

  1. https://www.sparkplayground.com/tutorials/spark-theory/catalyst-optimizer
  2. https://learn.microsoft.com/en-us/azure/databricks/pyspark/reference/functions/try_to_date