An Introduction to Delta Tables in Python

An Introduction to Delta Tables in Python

Delta Tables Part I

This post covers the Delta Lake, which is an open-source format extending parquet files for ACID transactions. More specifically, this covers how to work with Delta tables using the pyspark and native Delta APIs in python.

Delta tables can be thought of as having the benefits of a non-flat file format (compression via more efficient encoding), with a single source of truth called the transaction log.

Creating a Delta Table

To create a delta table, I'm loading an existing CSV using pyspark, and saving it using the format option in pyspark's write:

(Completely irrelevant, however, the dataset being used here is IBM's Airline Reporting Carrier On-Timer Performance Dataset)

# load original dataset
sdf = spark.read.load(
  '/storage/data/airline_2m.csv' ,
  format='com.databricks.spark.csv',
  header='true',
  inferSchema='true'
).select(['FlightDate', 'Reporting_Airline', 'Flight_Number_Reporting_Airline','Origin', 'Dest', 'DepTime', 'DepDelay', 'ArrTime', 'ArrDelay' ]).filter('Origin="JFK" and FlightDate>="2017-12-01" and FlightDate <= "2017-12-31"')

# write as a delta table
sdf.write.format('delta').mode('overwrite').save('/storage/data/airline_2m.delta')

# output
## [Stage 57:> (0 + 12) / 12]

Alternatively, we can use the CTAS statement in SQL: (note that this is for demonstration and new_table is not used for the rest of this notebook)

spark.sql('''
  CREATE OR REPLACE TABLE new_table
  using DELTA AS SELECT * FROM csv.`/storage/data/airline_2m.csv`
''')

## Output
[Stage 6:====> (1 + 11) / 12]

23/06/24 09:19:10 WARN MemoryManager: Total allocation exceeds 95.00% (951,320,564 bytes) of heap memory Scaling row group sizes to 88.60% for 8 writers

DataFrame[]

Now, if we inspect the path which was written, there are two things to note:

  1. There are multiple .parquet files

  2. There's a directory called the _delta_log.

/storage/data/airline_2m.delta
# output:
_delta_log/ 
part-00000-9db93c29-a618-4f69-aa5f-776e1ca1a221-c000.snappy.parquet 
part-00001-43218537-207b-4569-8d98-7cb1d2959d3d-c000.snappy.parquet 
part-00002-b41a2670-c5bc-4515-93c6-c9fe87c3d132-c000.snappy.parquet 
part-00003-0393fe9a-e8cc-4c69-83a4-e11828b75886-c000.snappy.parquet 
part-00004-edbda9cf-91b8-4752-bec3-f30e93651fe8-c000.snappy.parquet 
part-00005-9c275ad8-871a-4948-9630-40aef37c3d50-c000.snappy.parquet 
part-00006-d273f657-9c1f-4dd1-8bbf-fb66eba644f3-c000.snappy.parquet 
part-00007-91fbd325-4e1c-437f-a7c4-e0fd8e91b26d-c000.snappy.parquet 
part-00008-d6982b42-0653-4ef5-8b00-72f3bb62b7ce-c000.snappy.parquet 
part-00009-4539d215-00c7-4f2b-9fc7-cf8c7544070c-c000.snappy.parquet 
part-00010-4238061e-7d3c-43d5-9d29-9b4291b38d55-c000.snappy.parquet 
part-00011-00828917-003b-4eff-a175-81b3e86890cb-c000.snappy.parquet

This folder is called the _delta_log and is the single source of truth for the delta table, and contains all history for a given table; currently there is a single .json file, since only one operation was done to this table. This folder is automatically created when a table is created and is updated for every operation on the delta table.

/storage/data/airline_2m.delta/_delta_log
# Output
# 00000000000000000000.json
(jdf := spark.read.json("/storage/data/airline_2m.delta/_delta_log/00000000000000000000.json")).show()

# Output
+--------------------+--------------------+--------------------+--------+
|                 add|          commitInfo|            metaData|protocol|
+--------------------+--------------------+--------------------+--------+
|                null|{Apache-Spark/3.3...|                null|    null|
|                null|                null|                null|  {1, 2}|
|                null|                null|{1687486190991, {...|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
+--------------------+--------------------+--------------------+--------+

The above is a bit hard to see, so let's filter for the one entry where commitInfo is not null. In the commit info, there are several important parameters, namely the overwrite mode, the operation (in this case WRITE) and the timestamp.

from pyspark.sql import functions as F

jdf.filter(F.col('commitInfo').isNotNull()).select('commitInfo').show(truncate=False)
# Output
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|commitInfo                                                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|{Apache-Spark/3.3.2 Delta-Lake/2.3.0, false, Serializable, WRITE, {12, 33238, 75}, {Overwrite, []}, 1687486192334, 0e2eefc4-557d-45b2-ac9f-e1f56b484fbc}|
+--------------------------------------------------------------------------------------------------------------------------------------------------------+

The metadata stores information on the columns, type of columns, constraints on the columns and the type of file (parquet).

jdf.filter(F.col('metaData').isNotNull()).select('metaData').show(truncate=False)

# Output
|{1687486190991, {parquet}, e0aa9322-351f-40a6-9327-5d72059b0a0c, [], 
{"type":"struct",
    "fields":[
        {"name":"FlightDate","type":"timestamp","nullable":true,"metadata":{}},
        {"name":"Reporting_Airline","type":"string","nullable":true,"metadata":{}},
        {"name":"Flight_Number_Reporting_Airline","type":"integer","nullable":true,"metadata":{}},
        {"name":"Origin","type":"string","nullable":true,"metadata":{}},    
        {"name":"Dest","type":"string","nullable":true,"metadata":{}},
        {"name":"DepTime","type":"integer","nullable":true,"metadata":{}},
        {"name":"DepDelay","type":"double","nullable":true,"metadata":{}},
        {"name":"ArrTime","type":"integer","nullable":true,"metadata":{}},
        {"name":"ArrDelay","type":"double","nullable":true,"metadata":{}}]}}|

A more concise way of viewing this is by calling .history().show() on the delta table. Here we see metadata:

  • Version of the table

  • Timestamp of the given operation

  • UserID, Name (person/entity that performed the operation)

  • Operation Parameters: write mode, other config operations

  • Notebook/ClusterID: these are databricks specific and would not be populated here

  • Isolation Level: this controls write conflicts, and the degree to which a transaction must be isolated from modifications made by concurrent operations

# load table
delta_table = DeltaTable.forPath(spark, '/storage/data/airline_2m.delta/')
delta_table.history().show()

# Output
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2023-06-22 22:09:...|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|       null|  Serializable|        false|{numFiles -> 12, ...|        null|Apache-Spark/3.3....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+

Updating a Delta Table

Adding a Column

If I were to modify the table in any way, such as by adding a new row, this would be recorded in the transaction-log:

delta_table.toDF().count() # before the modification, we have 75 rows
# Output
75

Let's add a new column which is a function of DepDelay and ArrDelay. For example, say we want to observe the relationship between Departure and Arrival delays by means of a ratio:

from pyspark.sql import functions as F

delta_table = spark.read.format('delta').load('/storage/data/airline_2m.delta/')

delta_table_updated = (
    delta_table
      .withColumn('dep_to_arr_ratio', F.expr('round(DepDelay/ArrDelay, 3)'))
)

Great, let's write this back to the Delta path:

delta_table_updated.write.mode('overwrite').format('delta').save('/storage/data/airline_2m.delta/') # Hmm, that's not right!!

# Output
AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: e0aa9322-351f-40a6-9327-5d72059b0a0c).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- FlightDate: timestamp (nullable = true)
-- Reporting_Airline: string (nullable = true)
-- Flight_Number_Reporting_Airline: integer (nullable = true)
-- Origin: string (nullable = true)
-- Dest: string (nullable = true)
-- DepTime: integer (nullable = true)
-- DepDelay: double (nullable = true)
-- ArrTime: integer (nullable = true)
-- ArrDelay: double (nullable = true)


Data schema:
root
-- FlightDate: timestamp (nullable = true)
-- Reporting_Airline: string (nullable = true)
-- Flight_Number_Reporting_Airline: integer (nullable = true)
-- Origin: string (nullable = true)
-- Dest: string (nullable = true)
-- DepTime: integer (nullable = true)
-- DepDelay: double (nullable = true)
-- ArrTime: integer (nullable = true)
-- ArrDelay: double (nullable = true)
-- dep_to_arr_ratio: double (nullable = true)


To overwrite your schema or change partitioning, please set:
'.option("overwriteSchema", "true")'.

Note that the schema can't be overwritten when using
'replaceWhere'.

That's not right!!

In the above we get a schema mismatch error, this is because the original delta table did not contain our newly engineered column and implicitly enforces schema validation. To help identify the error, spark prints out the before and after schemas of the delta table. This is an incredibly stringent check and can be used as a gatekeeper of clean, fully transformed data that is ready for production. Schema enforcement prevents data "dilution" which can occur when new columns are added so frequently that the original data loses its meaning due to data deluge.

If you decide that you absolutely needed to add a new column, we use mode('overWriteSchema', 'true') in the write statement, also known as schema evolution in the Delta table documentation. There are two types of operations supported via schema evolution: adding new columns (what we're doing) and changing data types from Null to any other type OR upcasting Byte to Short or Integer.

Note, using the option overwrite here marks the original version of the data as "tombstone", which will cause this version to be removed if Delta's VACUUM command is run.

delta_table_updated.write.mode('overwrite').option('overwriteSchema', 'true').format('delta').save('/storage/data/airline_2m.delta/')

Now if we re-examine the delta-log, there are two files present here:

ls /storage/data/airline_2m.delta/_delta_log

# Output
00000000000000000000.json 00000000000000000001.json

If we view the delta-table history, we can see that a new version has been added, with a new timestamp and the type of operation. (Here we only select a few relevant columns for ease-of-viewing). In the operationParameters field, we can see where we passed in mode, and the in operationMetrics field, we can see that the number of parquet files has not changed (we didn't expect this to change) and neither did the number of rows. However, the numOutputBytes has changed due to the addition of our new column:

spark.sql('describe history delta.`/storage/data/airline_2m.delta/`').select(*['version', 'timestamp', 'operation', 'operationParameters', 'operationMetrics']).show(truncate=False)
versiontimestampoperationoperationParametersoperationMetrics
12023-06-24 09:26:21.574WRITE{mode -> Overwrite, partitionBy -> []}`{numFiles -> 12, numOutputRows -> 75, numOutputBytes -> 37214}

Upsert

This is hyper-specific to delta table syntax, where a combination of an insert and an update is done in one go. Let's say that I have a new table with updates to apply to the delta table:

from datetime import datetime


# create an updates dataframe
updates = [
    (datetime.strptime('2017-12-17', '%Y-%m-%d'), 'DL', 2634, 'JFK', 'MCO', 1125, 0.0, 1359, -32.0, 0.0), # update existing entry to have no departure delay
    (datetime.strptime('2017-12-26', '%Y-%m-%d'), 'DL',1368, 'JFK', 'MIA', 1107, 1.0, 1421, 1.0, 1.0), # new entry that did not exists
]
(updates_table := spark.createDataFrame(updates, schema=delta_table_updated.schema)).show()
FlightDateReporting_AirlineFlight_Number_Reporting_AirlineOriginDestDepTimeDepDelayArrTimeArrDelaydep_to_arr_ratio
2017-12-17 00:00:00DL2634JFKMCO11250.01359-32.00.0
2017-07-26 00:00:00DL1368JFKMIA11071.014211.01.0

This new data has both a row that matches an existing entry in our dataframe, as well as a new row that did not previously exist.

delta_table_updated.filter('Flight_Number_Reporting_Airline=1368').show()
FlightDateReporting_AirlineFlight_Number_Reporting_AirlineOriginDestDepTimeDepDelayArrTimeArrDelaydep_to_arr_ratio

This is the syntax for upserting:

table.alias('old_table')
  .merge(
    source=a_new_table.alias('a_new_table'),
    condition= # keys to match on, in this case we match on FlightData, Origin, Dest, Airline and Flight-Number
  )

Following this, there are multiple possible clauses, at least one of which is necessary:

  • whenNotMatchedInsert: adds new rows if no matches are made

  • whenMatchedUpdate: does not insert new rows, this is closest to an UPDATE statement in SQL

So, to update matches:

  .whenMatchedUpdate(set = {
    'Column1': F.col('a_new_table.Column1'), # use value from updates for column1
    'Column2': F.col('old_table.Column2'), # retain value from original dataset for column 2 
  })

Finally, to insert data where no matches are made:

  .whenNotMatchedInsertAll()

There can be any number of whenMatched clauses (at most one update and one delete) action. The update in the merge only updates the specified columns, and multiple whenMatched statements execute in the order specified. In order to update all the columns, use whenMatched(...).updateAll(), where ... would be your specified key matches. In the whenNotMatched case, this can only have the insert operation, where any unspecified column assume null values.

For the full API reference, see here

Performing the actual upsert, we specify conditions to match on FlightDate, Origin, Destination, Flight-Number and Airline, updating the columns for departure/arrival time/delay. Finally, there's a statement to insert new rows where they did not match.

(DeltaTable.forPath(spark, '/storage/data/airline_2m.delta/').alias('current_data')
  # specify merge conditions
  .merge(
      source=updates_table.alias('new_data'), 
      condition=F.expr('current_data.FlightDate = new_data.FlightDate and new_data.Origin = current_data.Origin and current_data.Dest = new_data.Dest and  current_data.Reporting_Airline = new_data.Reporting_Airline and current_data.Flight_Number_Reporting_Airline = new_data.Flight_Number_Reporting_Airline'))
  # update data where matched
  .whenMatchedUpdate(set = {
    'DepTime': F.col('new_data.DepTime'),
    'DepDelay': F.col('new_data.DepDelay'),
    'ArrTime': F.col('new_data.ArrTime'),
    'ArrDelay': F.col('new_data.ArrDelay'),
  })
  # insert where not matched
  .whenNotMatchedInsertAll()
  .execute()
 )
# we can see that a third entry has been added to the transaction log
ls /storage/data/airline_2m.delta/_delta_log

# Output
00000000000000000000.json 00000000000000000001.json 00000000000000000002.json
history = DeltaTable.forPath(spark, '/storage/data/airline_2m.delta/').history()
history.show() # that's a bit much, let's inspect individually
versiontimestampuserIduserNameoperationoperationParametersjobnotebookclusterIdreadVersionisolationLevelisBlindAppendoperationMetricsuserMetadataengineInfo
22023-06-24 09:59:...nullnullMERGE{predicate -> (((...nullnullnull1Serializablefalse{numTargetRowsCop...nullApache-Spark/3.3....
12023-06-24 09:26:...nullnullWRITE{mode -> Overwrit...nullnullnull0Serializablefalse{numFiles -> 12, ...nullApache-Spark/3.3....
02023-06-22 22:09:...nullnullWRITE{mode -> Overwrit...nullnullnullnullSerializablefalse{numFiles -> 12, ...nullApache-Spark/3.3....
history.select(*['timestamp', 'version','operation']).show() # there is now a third version (number 2), with an operation MERGE
timestampversionoperation
2023-06-24 09:59:...2MERGE

operationParameters shows us the substance of the operation, where the predicate outlines our merge conditions and the defined action (in this case an update), followed by a predicate for non-matched rows:

history.filter('version=2').select('operationParameters').show(truncate=False)
operationParameters
{predicate -> ((((current_data.FlightDate = new_data.FlightDate) AND (new_data.Origin = current_data.Origin)) AND (current_data.Dest = new_data.Dest)) AND ((current_data.Reporting_Airline = new_data.Reporting_Airline) AND (current_data.Flight_Number_Reporting_Airline = new_data.Flight_Number_Reporting_Airline))), matchedPredicates -> [{"actionType":"update"}], notMatchedPredicates -> [{"actionType":"insert"}], notMatchedBySourcePredicates -> []}

Finally, if we view operation metrics, we get a readout for the number of rows added/deleted, files added/deleted, the number of input/output rows etc etc. This is the most useful for logging operations performed on a delta table, and can be used to easily catch any spurios operations, such as the addition of extra rows, schema implosion and file cloning.

history.filter('version=2').select('operationMetrics').show(truncate=False)
operationMetrics
{numTargetRowsCopied -> 7, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 3209, numTargetBytesRemoved -> 3218, numTargetRowsMatchedUpdated -> 1, executionTimeMs -> 875, numTargetRowsInserted -> 1, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 515, numTargetRowsUpdated -> 1, numOutputRows -> 9, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 340}

Time-Travel

Now, possibly the most useful feature of having a transaction-log is "time-travel". This is essentially being able to access versions of the delta at different points in its lifetime. Let's re-examine the history:

history.select(*['version', 'timestamp']).show()
versiontimestamp
22023-06-24 09:59:...
12023-06-24 09:26:...
02023-06-22 22:09:...

We can see 3 versions, the original, and two operations. In order to access these previous versions, we need to use one of the (many) ways of accessing previous revisions:

# reading version 0, we can see that the engineered column added in version 1 does not exist as yet
(spark.read.format('delta').option('versionAsOf', 0).load('/storage/data/airline_2m.delta/')).printSchema()

# Output
root
 |-- FlightDate: timestamp (nullable = true)
 |-- Reporting_Airline: string (nullable = true)
 |-- Flight_Number_Reporting_Airline: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- ArrDelay: double (nullable = true)

Let's read version 1 using a slightly different syntax:

history.select(*['version', 'timestamp']).show(truncate=False)
versiontimestamp
22023-06-24 09:59:29.027
12023-06-24 09:26:21.574
02023-06-22 22:09:52.338

In this version, our new column has been added:

(spark.read.format('delta').option('timestampAsOf', '2023-06-24 09:26:21.574').load('/storage/data/airline_2m.delta/')).printSchema()

# output
root
 |-- FlightDate: timestamp (nullable = true)
 |-- Reporting_Airline: string (nullable = true)
 |-- Flight_Number_Reporting_Airline: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- dep_to_arr_ratio: double (nullable = true)

Finally, if we view version 2, we can see the difference in the number of rows:

(spark.read.format('delta').option('versionAsOf', 1).load('/storage/data/airline_2m.delta/')).count(), (spark.read.format('delta').option('versionAsOf', 2).load('/storage/data/airline_2m.delta/')).count()

# output
# (75, 76)

Vacuum

Finally, one of the most unique features of time-travel is the vacuum. Time-travel is enabled via the transaction log, however it would be inefficient to store every version of every table forever. As aforementioned, using the overwrite option marks previous versions of data for being removed via the vacuum command. The other condition for removing files via vacuum is files existing after some given time period (by default it's 7 days). If we observe the delta directory now (after three operations):

  /home ❯ tree /storage/data/airline_2m.delta                                  at  10:43:52
/storage/data/airline_2m.delta
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   └── 00000000000000000002.json
├── part-00000-716191cb-67cf-4e63-a06d-c3943fb45664-c000.snappy.parquet
├── part-00000-74403ae3-1703-4c27-aa4c-7de9bd1e6fbe-c000.snappy.parquet
├── part-00000-9db93c29-a618-4f69-aa5f-776e1ca1a221-c000.snappy.parquet
├── part-00001-43218537-207b-4569-8d98-7cb1d2959d3d-c000.snappy.parquet
├── part-00001-e1a760f6-c163-4aa5-b327-fd97f40d8509-c000.snappy.parquet
├── part-00002-17bd3789-0052-4a37-aa0a-bd91b4b1fbc7-c000.snappy.parquet
├── part-00002-b41a2670-c5bc-4515-93c6-c9fe87c3d132-c000.snappy.parquet
├── part-00003-0393fe9a-e8cc-4c69-83a4-e11828b75886-c000.snappy.parquet
├── part-00003-bad613ae-6925-4578-9c0e-dfd43d31a8f7-c000.snappy.parquet
├── part-00004-4846b914-1702-446b-861a-52c9e9f080f0-c000.snappy.parquet
├── part-00004-edbda9cf-91b8-4752-bec3-f30e93651fe8-c000.snappy.parquet
├── part-00005-61394241-d08a-4dbc-9f9e-8a2a904c0a18-c000.snappy.parquet
├── part-00005-9c275ad8-871a-4948-9630-40aef37c3d50-c000.snappy.parquet
├── part-00006-1829cf9d-4451-4882-9587-2a92cfdff619-c000.snappy.parquet
├── part-00006-d273f657-9c1f-4dd1-8bbf-fb66eba644f3-c000.snappy.parquet
├── part-00007-91fbd325-4e1c-437f-a7c4-e0fd8e91b26d-c000.snappy.parquet
├── part-00007-aaf31339-c029-4955-80f5-2649b4fe6caf-c000.snappy.parquet
├── part-00008-2340179d-0109-4c50-89ba-f31242beba14-c000.snappy.parquet
├── part-00008-d6982b42-0653-4ef5-8b00-72f3bb62b7ce-c000.snappy.parquet
├── part-00009-4539d215-00c7-4f2b-9fc7-cf8c7544070c-c000.snappy.parquet
├── part-00009-5b136823-d160-403d-b8aa-6fe6126aad2d-c000.snappy.parquet
├── part-00010-4238061e-7d3c-43d5-9d29-9b4291b38d55-c000.snappy.parquet
├── part-00010-c8695154-ff81-43d7-b7c3-f2687192ce7a-c000.snappy.parquet
├── part-00011-00828917-003b-4eff-a175-81b3e86890cb-c000.snappy.parquet
└── part-00011-7fc1ae81-7e4e-4dfd-b8be-fd5a0c43e127-c000.snappy.parquet

1 directory, 28 files
# we need to disable some safety checks
spark.conf.set('spark.databricks.delta.retentionDurationCheck.enabled','false')
spark.sql('vacuum airline_2m retain 0 hours').show(truncate=False)

The output is as follows

Deleted 0 files and directories in a total of 1 directories.
+-------------------------------------------------------------+
|path                                                         |
+-------------------------------------------------------------+
|file:/storage/projects/notes/tools/spark-warehouse/airline_2m|
+-------------------------------------------------------------+

The above would show files removed if there were older files in the delta-history. The safety check is disabled since running VACUUM for files that existed less than a week is generally not desirable.