Purpose:

The primary objective for this document is to provide awareness and establish clear understanding of coding standards and best practices to adhere while developing PySpark components. Best Practices are any procedure that is accepted as being the most effective either by consensus or prescription. Practices can range from stylistic to in-depth design methodologies.

In an attempt to document the best practices for Databricks, it becomes essential to cover some of the key Python best practices. So, here let’s start with general Python standards and deep dive into PySpark eventually.

Code Layout:

Indentation

Use 4 spaces for each indentation level. Continuation lines should align the wrapped elements vertically as hanging indent. While using hanging indent, there should be no arguments in the first line and further indentation should be used to clearly distinguish it as a continuation line.

# CORRECT:

# Aligned with opening delimiter
foo = function_name(var_one, var_two,
var_three, var_four)

# Add 4 spaces and an extra level of indentation to distinguish the arguments with the rest
def function_name(
var_one, var_two,
var_three, var_four):
print(var_one)

# Hanging indent should add a level
foo = function_name(
var_one, var_two,
var_three, var_four)

# WRONG:

# Arguments on first line forbidden when not using vertical alignment.
foo = long_function_name(var_one, var_two,
var_three, var_four)

# Further indentation required as indentation is not distinguishable.
def long_function_name(
var_one, var_two, var_three,
var_four):
print(var_one)

Adding two blank lines before functions and classes

Top level function and classes are separated with two blank lines
Method definitions inside class should be separated with one blank line
Extra blank lines may be used sparingly to separate a group of related functions
Use blank lines in functions sparingly, to indicate logical sections

# Top level function and class are separated with two blank lines
# Function definitions within a class are separated with single blank line
class MyParentClass:

def function_one(var_one, var_two):
print(var_one)
print(var_two)

def function_two(var_one, var_two):
print(var_one + var_two)

Limiting Line Lengths

Limit all lines to a maximum of 79 characters
For flowing long blocks of text with fewer structural restrictions (docstrings or comments), the line length should be restricted to 72 characters

Imports

Imports should usually be in separate lines
Wildcard imports should be avoided, as it will not give a clear picture of what names present in namespace

# Correct:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import sys
import os

# Wrong:
import sys, os
from pyspark.sql import *

Naming Conventions

Do not use ‘l’, ‘O’, ‘I’ as a single variable name. As all these characters appear similar to numeric 1 and 0 in some fonts, it may cause confusion.
Generally, it is recommended to use short names. In some cases, underscores can be used for better readability
Prefixing a single underscore (_) has support for protecting module variables
Prefixing a double underscore (__) to an instance or method makes it private to its class

 

TypeNaming ConventionExample

FunctionUse lower case variables separated with underscoresmyfunction, my_function

VariableUse lowercase letters or word, or word separated with underscoresx, my_variable

ClassUse Pascal casing. Keep first letter of each sentence in uppercase. Do not separate words with underscoresMyClass, ProcessCustomerData

MethodUse lower case words separated with underscoresget_customer_data

ConstantUse upper case letter, word or words separated with underscoreX, LOCATION, LOCATION_COUNT

ModuleUse short lower-case words separated with underscoressupport_module.py

PackageUse short lower-case words without underscoressupportmodule,mypackage

General Recommendations

Comparing singletons

Use ‘is’ while comparing singletons
Use ‘is not’ instead of ‘not … is’

# Correct:
if foo is not None:
do_something()

# Wrong:
if foo == None:
do_something()

# Wrong:
if not foo is None:
do_something()

Avoid lambda expression

Always use ‘def’ statement instead of an assignment statement for anonymous (lambda) expressions.

# Correct:
def multiply(x):
return 2*x

# Wrong:
f = lambda x: 2*x

Deriving Exceptions

Derive exceptions from ‘Exception’ instead of ‘BaseException’
Use explicit exception catching as much as possible. Avoid implicit exception catching.
Keep ‘try’ section logic as simple as possible

# Correct
try:
import platform_specific_module

except ImportError:
platform_specific_module = None

else:
do_something()

# Wrong:
try:
import platform_specific_module
do_something()
except ImportError:
platform_specific_module = None

Boolean Comparison

Booleans are already Booleans. They don’t need comparisons.
For sequences (e.g., Lists), use the fact that the empty sequences represent false

# Correct:
if is_active_customer:
do_something()

# Wrong:
if is_active_customer == True:
do_something()

# Wrong:
if is_active_customer is True:
do_something()

# Wrong: If the list is empty, it represents FALSE. So, no need to check the length of the list
if len(customer_list) != 0:
do_something()

Databricks – Best Practices

Avoid print statements. Use logging module.
Ensure reusability of code modules throughout the file. Use existing reusable components instead of creating new functions redundantly.
While using recursive function with Spark, make sure it has appropriate break statement. Otherwise, it will lead of over-utilization of resources
Usernames, Passwords, Hostnames should not be maintained in direct python file or notebook. Sensitive information should be managed in secure vault, and it should be referenced in python file or notebook using keys.
While using SQL statements, it is recommended to assign the SQL statement to a variable. And use the variable in Spark SQL API.

# Correct:
sql_query = ‘SELECT col_1, col_2, col_3 FROM table’
df_data = spark.sql(sql_query)

# Bad:
df_data = spark.sql(‘SELECT col_1, col_2, col_3 FROM table’)

Refactor complex chaining of expressions. It’s recommended to apply multi-line expressions with different types, especially if they have different behaviours and context. Example is, mixing column creation or joining with selecting and filtering.

# Bad:
df = (
df
.select(‘a’, ‘b’, ‘c’, ‘key’)
.filter(F.col(‘a’) == ‘truthiness’)
.withColumn(‘boverc’, F.col(‘b’) / F.col(‘c’))
.join(df2, ‘key’, how=’inner’)
.join(df3, ‘key’, how=’left’)
.drop(‘c’)
)

# Better (separating into steps):
# Step 1: we select and trim down the data that we need
# Step 2: we create the columns that we need to have
# Step 3: joining with other dataframes
df = (
df
.select(‘a’, ‘b’, ‘c’, ‘key’)
.filter(F.col(‘a’) == ‘truthiness’)
)
df = df.withColumn(‘boverc’, F.col(‘b’) / F.col(‘c’))

df = (
df
.join(df2, ‘key’, how=’inner’)
.join(df3, ‘key’, how=’left’)
.drop(‘c’)
)

Use select statement to specify a schema contract. Doing a select at the beginning of the transform, or before returning, is considered a good practice. Any select should be seen as a cleaning operation that is preparing the dataframe for consumption by the next step in the transform.

Keep select statements as simple as possible. Due to common SQL idioms, allow only one function from spark.sql.function to be used per selected column, plus an optional .alias() to give it a meaningful name.

Expressions involving more than one dataframe, or conditional operations like .when() are discouraged to be used in a select, unless required for a performance reasons.

# Good:
aircraft = aircraft.select(
‘aircraft_id’,
‘aircraft_msn’,
‘aircraft_type’,
‘operator_code’,
F.col(‘aircraft_registration’).alias(‘registration’),
F.col(‘number_of_economy_seats’).cast(‘long’),
F.col(‘number_of_business_seats’).cast(‘long’),
F.avg(‘staleness’).alias(‘avg_staleness’),
F.avg(‘flight_hours’).alias(‘avg_flight_hours’),
)

# Bad:
aircraft = aircraft.select(
‘aircraft_id’,
‘aircraft_msn’,
F.col(‘aircraft_registration’).alias(‘registration’),
‘aircraft_type’,
F.avg(‘staleness’).alias(‘avg_staleness’),
F.col(‘number_of_economy_seats’).cast(‘long’),
F.avg(‘flight_hours’).alias(‘avg_flight_hours’),
‘operator_code’,
F.col(‘number_of_business_seats’).cast(‘long’),
)

Instead of using withColumnRenamed(), use aliases. Also, instead of using withColumn() to redefine type, cast it in the select.

# Good:
df.select(‘key’, F.col(‘comments’).alias(‘num_comments’))

# Good:
df.select(F.col(‘comments’).cast(‘double’))

# Bad:
df.select(‘key’, ‘comments’).withColumnRenamed(‘comments’, ‘num_comments’)

# Bad:
df.select(‘comments’).withColumn(‘comments’, F.col(‘comments’).cast(‘double’))

 

In places where an empty column is required to be added to satisfy the schema, always use F.lit(None) function for populating empty column. Never use an empty string or other value that represent empty like “NA”, “N/A”, “Nil”.

Though it is semantically right to use so, the primary reason for recommending to use F.lit(None) is to preserve the ability to use utilities like isNull, instead of verifying empty string, “NA”, “N/A”, “Nil”.

# Good:
df = df.withColumn(‘foo’, F.lit(None))

# Bad:
df = df.withColumn(‘foo’, F.lit(”))

# Bad:
df = df.withColumn(‘foo’, F.lit(‘NA’))

User-Defined Functions (UDF)

It is highly recommended to avoid UDFs in all situations, as it is less performant than native PySpark. In most cases, logic that necessitate a UDF can be refactored to get of UDF and use only native PySpark API.

Joins

It is recommended to be more cautious when using joins. For instance, when we perform a left join and the right-side table has multiple matches for a key, then the row will be duplicated as many times as there are matches. This will heavily impact the output of transformation job.

Always specify the how keyword explicitly, even if you are doing default inner join.

# Good:
telemetry = telemetry.join(sensor_data, ‘vehicle_id’, how=’inner’)

# Bad:
telemetry = telemetry.join(sensor_data, ‘vehicle_id’)

# Bad:
telemetry = telemetry.join(sensor_data, ‘vehicle_id’, ‘inner’)

Avoid using right-join. If you are about to use right-join, change the order of the dataframes in such a way to use a left-join instead. It is more intuitive since the dataframe you are doing operation on, is the one that you are performing your join around.

# Good:
telemetry = telemetry.join(sensors, on=’vehicle_id’, how=’left’)

# Bad:
sensors = sensors.join(telemetry, on=’vehicle_id’, how=’right’)

Cache table/dataframe for re-usable tables

cache() is an Apache Spark transformation that can be used on RDD, Dataframe or Dataset when you perform more than one operation with this entity (RDD/Dataframe/Dataset). As cache() is a transformation operation, the caching operation takes place only when a Spark action (count, show, take or write) is also performed on the same dataframe, dataset or RDD in a single action,

df1 = spark.read.csv(input_path_1)
df2 = spark.read.csv(input_path_2)
df1.cache() # Transformation – Cache Dataframe df1

joined_df = df1.join(df2, df1.id==df2.id, how=’inner’) # Join Dataframe df1 & df2
filtered_df = joined_df.filter(“id == ‘ID100′”) # Filter the joined Dataframe df1 for id ‘ID100’
df1.count() # Call count on the cached Dataframe df1
filtered_df.show() # Show data out of filtered Dataframe filtered_df

In the above snippet, Dataframe df1 will be cached into memory only when df1.count() action is executed. df1.cache() does not initiate the caching operation on Dataframe df1.

df = spark.read.csv(input_file_path)
df.cache.take(10) # Calling take(10) on the dataframe, while caching it
df.count() # Call count() on the cached dataframe df

In the above snippet, Dataframe df will be cached into memory when take(10) action is executed. However, there is a catch in this action that only one partition will be cached. That is, take(10) process only 10 records and the partition associated to those 10 records, will only be cached and other partitions will not be cached. As a result, the next statement df.count() will create the dataframe df again. Rather df.cache.count() will cache the records from all partitions. Hence, it is recommended to use df.cache.count() wherever the use cases require to cache all data.

# Recommended
df = spark.table(‘input_table_data’)
df.cache.count() # cache dataframe df
df.count() # call count return result from the cached dataframe

String Comparison

Databricks string comparison is case sensitive and cannot compare strings with different casing.

sql_query = “SELECT ‘DAGSequence’=’dagsequence’ AS WithoutLowerUpper,
LOWER(‘DAGSequence’)=’dagsequence’ AS WithLowerCase,
UPPER(‘DAGSequence’)=’DAGSEQUENCE’ AS WithUpperCase”
df = spark.sql(sql_query)
df.show()

Result:

+———————+—————-+—————-+

|WithoutLowerUpper|WithLowerCase|WithUpperCase|

+———————+—————-+—————-+

|                          false|                  true|                   true|

+———————+—————-+—————-+

Table Partitioning

Delta tables in Databricks support partitioning which enhance performance. You can partition by a column if you expect data in that partition to be at least 1GB. If column cardinality is high, do not use that column for partitioning. For example, you partition by user ID and there are 1M distinct user IDs, partitioning would increase table load time. Example:

CREATE TABLE weather(
weather_date DATE,
location STRING,
location_type STRING,
temperature DOUBLE
) USING delta PARTITIONED BY (location)

(or)

CREATE TABLE weather(
location_type STRING,
temperature DOUBLE
) PARTITIONED BY (weather_date DATE,
location STRING)

Delta Lake performance using OPTIMIZE with ZORDER

Z-Ordering is an approach to collocate related information in the same set of files. The technique of co-locality is automatically applied by data-skipping algorithms in Delta Lake on Databricks, to greatly reduce the amount of data to be read. To Z-Order data, specify the columns to be ordered on in the ZORDER BY clause.

OPTIMIZE events
WHERE date > current_timestamp() – INTERVAL 1 day
ZORDER BY (event_type)

Use of Repartition hints for balancing partitions

Below are the different partitioning hint types,

COALESCEReduce the number of partitions to the specified number of partitions. It takes a partition number as a parameter

REPARTITIONRepartition to the specified number of partitions using the specified partitioning expressions. It takes partition number, column names, or both as parameters

REPARTITION_BY_RANGERepartition to the specified number of partitions using the specified partitioning expressions. It takes column names and an optional partition number as parameters

REBALANCEREBALANCE hint can be used to rebalance the query result output partitions, so that every partition is of a reasonable size. It can take column names as parameters. This hint will be useful when we need to write the result of the query to a table, to avoid too small/big files. This hint is ignored if AQE (Adaptive Query Execution) is not enabled.

Delete temporary table after notebook execution

Delete temporary tables that were created as intermediate tables during notebook execution. Deleting tables saves storage, especially if the notebook is scheduled daily.

spark.catalog.dropTempView(‘temp_view_name’)

spark.sql(‘drop view temp_view_name’)

Use views when creating intermediate tables

If we need to create intermediate tables, use views to minimize storage usage and save costs. Views are session-oriented and will automatically remove tables from storage after the query execution. For optimal query performance, do not use joins or subqueries in views

df.createOrReplaceTempView(‘tmp_research_data’) # this view will be available until active user session

df.createOrReplaceGlobalTempView(‘gv_research_data’) # this view will be available until active spark session (cluster)