Table of contents
Delta tables carry many unique, useful properties. This post deconstructs three core concepts used by Delta tables for query runtime and space optimization.
Compaction
This is the most basic form of space optimization. When large amounts of data is stored, there are two main issues which can arise:
File-system overhead: This might arise when the Master File Table (or File-Allocation Table in older OS's) must keep a record for each individual file. Note that this occurs for exceedingly large numbers of fragmented files, however the error compounds if this occurs for multiple tables across separate projects potentially mounted in a common file-system
Random query performance: this occurs if no indexing is done on the smaller files; i.e. column data is distributed randomly across several small files. While we may wish to separate a huge table into smaller files (see partitioning below), randomly fragmented files pose a challenge in terms of query optimization. This results in multiple file-reads for a single query and can potentially cause high memory usage.
Compaction coalesces small files into fewer larger ones. This is the simplest form of optimization, and is best demonstrated by example. In the following delta table, there are 100 .parquet
files in the kilobyte file range. The default minimum file size for Delta is 1GB. In order to compact this Delta table, we must run the optimize command
exa -G1lah /storage/data/airline_2m.delta/
# Output:
Permissions Size User Date Modified Name
.rw-r--r-- 9.7k aadi 4 Feb 19:06 .part-00000-5165bb55-69be-421e-8c15-c4d2a3eadaeb-c000.snappy.parquet.crc
.rw-r--r-- 9.7k aadi 4 Feb 19:06 .part-00001-96593b3c-c4fa-4dfe-993f-4a2695dee340-c000.snappy.parquet.crc
.rw-r--r-- 9.7k aadi 4 Feb 19:06 .part-00002-88ac3720-f586-44d7-9c50-1a8daba4018b-c000.snappy.parquet.crc
.rw-r--r-- 9.7k aadi 4 Feb 19:06 .part-00003-59657ab2-6feb-46ce-8fa0-fb74abf0c2ee-c000.snappy.parquet.crc
.rw-r--r-- 9.7k aadi 4 Feb 19:06 .part-00004-89d17de3-2e7b-4ea3-8960-26189961ffab-c000.snappy.parquet.crc
.rw-r--r-- 9.6k aadi 4 Feb 19:06 .part-00005-5b559b67-80af-45b5-9aab-8380f4f048a7-c000.snappy.parquet.crc
.rw-r--r-- 9.6k aadi 4 Feb 19:06 .part-00006-4786459c-cbdc-4620-80f3-2c3a6c24537c-c000.snappy.parquet.crc
.rw-r--r-- 9.7k aadi 4 Feb 19:06 .part-00007-548de79d-7c92-4768-a0a7-d99ac59b847e-c000.snappy.parquet.crc
In the below (after performing the VACUUM
operation, we can see that the table has been condensed down to one larger .parquet
file)
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/storage/data/airline_2m.delta/")
dt.optimize().executeCompaction()
dt.vacuum(0.1) # to show the changed directories! Old files kept for versioning
ls -lah /storage/data/airline_2m.delta/
# Output
total 101M
drwxr-xr-x 3 aadi aadi 28K Feb 4 19:20 .
drwxr-xr-x 13 aadi 1001 4.0K Feb 4 19:05 ..
drwxr-xr-x 2 aadi aadi 4.0K Feb 4 19:20 _delta_log
-rw-r--r-- 1 aadi aadi 100M Feb 4 19:16 part-00000-081cb5a6-1fd0-4050-9243-b775a26bc8c3-c000.snappy.parquet
-rw-r--r-- 1 aadi aadi 799K Feb 4 19:16 .part-00000-081cb5a6-1fd0-4050-9243-b775a26bc8c3-c000.snappy.parquet.crc
Bin-Packing
This is done in parrallel (pun intended!) with compaction. For each column, statistics are stored (min/max). When reading the Delta table, the min/max of every file is checked, and files that are NOT included in the read (e.g. in a filter) are not read at all. This is all stored in the _delta_log
.
The original table is now stored as a 10-parquet delta table.
csv = spark.read.csv('/storage/data/airline_2m.csv', header=True)
csv.repartition(10).write.format('delta').save('/storage/data/airline_2m.delta')
If we look at the _delta_log
folder, we can see where these columnar statistics are kept. Without going into every single detail of the Delta logs asn JSON files, the most important details are: JSON is an object-file format with key-values where each value can be another set of key-values and for each operation a new object with its parent key named the given operation is added to the log. For example, and INSERT
operation (aliased to add
) may be part of multiple add
s making up a table write. In this example, the log file had 10 separate adds for each of the 10 separate parquet files being written. The following output was long so I removed the non-relevant parts:
Here we can see the name of the operation:
{
"add": {
Here we see the actual parquet file being referred to within the parent Delta-table folder, along with metadata:
"path": "part-00002-5095c00a-c9d5-4c0b-b5de-f84b38d2b84b-c000.snappy.parquet",
"partitionValues": {},
"size": 10520682,
"modificationTime": 1707089734401,
"dataChange": true,
This is the relevant portion to be understood for bin-packing. Each column keeps a set of values under the stats
key for bin (this is a pretty wide file so only certain columns are kept for purposes of demonstration). There are a few things of note: if the types are correctly inferred, this process works correctly. As shown below, FlightDate
min/max makes sense at 1987 and 2020. Similarly, for string features the min/max seems to be alphabetical, with the min
taken as a state beginning with the letter A
(which is before the beginning letter of the state in the max
). However, the Tail_Number
column seems to have some data-quality issues, as the -
symbol to start the Tail_Number
is not a valid American tail-number. As such, the partitions for this column may be off.
"stats": {
"numRecords": 200000,
"minValues": {
"Year": "1987",
"Quarter": "1",
"Month": "1",
"DayofMonth": "1",
"DayOfWeek": "1",
"FlightDate": "1987-10-01",
"Reporting_Airline": "9E",
"DOT_ID_Reporting_Airline": "19386",
"IATA_CODE_Reporting_Airline": "9E",
"Tail_Number": "-N047M",
"Flight_Number_Reporting_Airline": "1",
"OriginAirportID": "10135",
"OriginAirportSeqID": "1013501",
"OriginCityMarketID": "30070",
"Origin": "ABE",
"OriginCityName": "Aberdeen, SD",
"OriginState": "AK",
"OriginStateFips": "01",
"OriginStateName": "Alabama",
"OriginWac": "1",
"DestAirportID": "10135",
"DestAirportSeqID": "1013501",
"DestCityMarketID": "30070",
"Dest": "ABE",
"DestCityName": "Aberdeen, SD",
"DestState": "AK",
"DestStateFips": "01",
"DestStateName": "Alabama",
"DestWac": "1",
"CRSDepTime": "0000",
"DepTime": "0001",
"DepDelay": "-1.00"
},
"maxValues": {
"Year": "2020",
"Quarter": "4",
"Month": "9",
"DayofMonth": "9",
"DayOfWeek": "7",
"FlightDate": "2020-03-31",
"Reporting_Airline": "YX",
"DOT_ID_Reporting_Airline": "21171",
"IATA_CODE_Reporting_Airline": "YX",
"Tail_Number": "�NKNO�",
"Flight_Number_Reporting_Airline": "999",
"OriginAirportID": "16440",
"OriginAirportSeqID": "1644001",
"OriginCityMarketID": "35991",
"Origin": "YUM",
"OriginCityName": "Yuma, AZ",
"OriginState": "WY",
"OriginStateFips": "78",
"OriginStateName": "Wyoming",
"OriginWac": "93",
"DestAirportID": "16440",
"DestAirportSeqID": "1644001",
"DestCityMarketID": "36101",
"Dest": "YUM",
"DestCityName": "Yuma, AZ",
"DestState": "WY",
"DestStateFips": "78",
"DestStateName": "Wyoming",
"DestWac": "93",
"CRSDepTime": "2359",
"DepTime": "2400",
"DepDelay": "995.00"
},
}
}
}
OPTIMIZE
can be tuned used a few different options:
minFileSize
: groups files smaller than the thresholdmaxFileSize
: specifies the target file sizerepartition.enabled
: this is used to change the behaviour ofOPTIMIZE
and will userepartition
instead ofcoalesce
when reducing.
Partitioning
This is more of a tangent that is to be used in tandem with Z-ordering. Partitioning specifies manual bins for storing given data. For example, using our original dataset, we may partition by a column that is most frequently used in filter
operations (similar to indexes but more tangible). For example:
# reading in our original delta file
(
spark.read.format('delta')
.load('/storage/data/airline.delta')
# re-writing to partition
.write.format('delta').mode('overwrite').option('overwriteSchema', 'true')
# by OriginStateName
.partitionBy('OriginStateName')
.save('/storage/data/airline.delta')
)
Now if we observe the new Delta directory:
> ls -lah /storage/data/airline.delta | head -10
# no more random file-names
total 9.1G
drwxr-xr-x 57 aadi aadi 140K Jan 28 09:28 .
drwxr-xr-x 12 aadi 1001 4.0K Jan 27 14:40 ..
drwxr-xr-x 2 aadi aadi 4.0K Jan 28 09:30 _delta_log
drwxr-xr-x 2 aadi aadi 4.0K Jan 28 09:28 OriginStateName=Alabama
drwxr-xr-x 2 aadi aadi 4.0K Jan 28 09:28 OriginStateName=Alaska
drwxr-xr-x 2 aadi aadi 4.0K Jan 28 09:28 OriginStateName=Arizona
drwxr-xr-x 2 aadi aadi 4.0K Jan 28 09:29 OriginStateName=Arkansas
drwxr-xr-x 2 aadi aadi 4.0K Jan 28 09:29 OriginStateName=California
drwxr-xr-x 2 aadi aadi 4.0K Jan 28 09:29 OriginStateName=Colorado
We can see now that there are individual parquet files corresponding to each entry in the given column. Note that doing this for columns of high cardinality defeats the purpose of compaction.
In order to see this optimization, we can read before and after partitioning:
(
spark.read.format('delta')
.option('versionAsOf', 1) # before partitioning
.load('/storage/data/airline.delta/')
.filter('OriginStateName=="Alaska"')
).collect() # about 1.25 minutes
(
spark.read.format('delta')
# no version specified (i.e. after partitioning)
.load('/storage/data/airline.delta/')
.filter('OriginStateName=="Alaska"')
).collect() # about 30 seconds
Z-Ordering
Finally, the most complex type of Delta optimization gets its name from a locality-preserving technique known as the Z-order curve. This is the most intense to understand, however for the purposes of understanding the application in Delta tables, know that this method interleaves the binary representations of the values in each numeric column. This allows related data (i.e. data located geometrically closer to each other than other data) to be colocated. This technique reduces the amount of data that Delta must read by completely omitting files that are not needed (data-skipping).
There are multiple applications for z-ordering, where the columns in the Z-order clause have similar properties in terms of range and distribution. For example, the column of OriginStateName
and DestStateName
may prove sufficient candidates for Z-Ordering
, as these are not exceedingly high cardinality while doubly being columns that may be used regularly in filter clauses.
Z-ordering is not applicable where two columns may have some correlation (e.g. Z-ordering by departure AND arrival time wastes compute). Z-ordering also does not differ significantly from lexical ordering in columns with low cardinality. The following cases are specific to where Z-ordering should be reconsidered:
Highly Dynamic Data: If your data updates frequently and involves insertions, deletions, and updates across the entire dataset, the Z-order might quickly become outdated and lose its effectiveness. Maintaining the Z-order efficiently in such scenarios can be cumbersome and resource-intensive.
Unpredictable Access Patterns: If your queries access data in a random or unpredictable manner, Z-ordering might not provide significant performance gains. It works best when you have predictable access patterns that align with the order you choose.
Small Datasets: For small datasets, the overhead of managing the Z-order might outweigh the performance benefits. Simple indexing might be a more efficient approach in such cases.
Complex Ordering Criteria: Setting up and maintaining a Z-order based on multiple, complex, or frequently changing criteria can be challenging and introduce additional maintenance overhead.
Data Skew: If your data exhibits significant skew towards certain values in the chosen Z-order columns, it might not improve access speed uniformly. In fact, it could even worsen performance for queries focusing on the less frequent values.
For the syntax of how Z-ordering is applied, see the Delta Table documentation