Photo by Maksym Kaharlytskyi on Unsplash
Incremental ingestion with Databricks’ Autoloader via File notifications
Table of contents
What is Autoloader?
Autoloader (aka Auto Loader) is a mechanism in Databricks that ingests data from a data lake. The power of autoloader is that there is no need to set a trigger for ingesting new data in the data lake - it automatically pulls new files into your streaming jobs once they land in the source location.
Autoloader listens for new files in your cloud source location that land and reads them into your streaming application. The data from the new file will be appended to the Delta table specified (or overwritten) depending on the configuration settings you've applied.
Ingestion methods
There are two methods for guaranteeing exactly-once data processing using Autoloader when ingesting new files from the streaming query’s source location:
1. Directory listing
With directory listing, Autoloader lists every file in the source directory and then ingests them all at once.
This is turned on by default and only requires access to your source directory. It is particularly useful when dealing with a small number of files at the source level.
2. File notification
This method only ingests files from the source location that were not included in its ingestion history.
It's the preferred method for ingesting a large number of files because Autoloader will only listen for new files at the source directory and pulls them into the streaming query, effectively ignoring files previously ingested.
File notification mode requires additional cloud permissions to set up the following:
Notification service: to send alerts to Autoloader on new files dumped into the source directory.
Queue service: to store the ingestion history of files already ingested by Autoloader
Configuring Autoloader’s File notification approach
Unlike directory listing, the file notification approach requires configuration to work, which includes the following credentials:
Client ID
Client secret
Tenant ID
Subscription ID
Connection string
Resource group
Here's where to find each of these credentials in the Azure portal:
Client ID
Go to Azure Active Directory
Click on App registrations
Click on the app you've just created
The client ID is the same as the Application (client) ID, which should appear on this page
Client secret
Go to Azure Active Directory
Click on App registrations
Click on the app you've just created
Click on Certificates & secrets
The client secret is the value of the secret created from the previous steps, which may be masked at this stage.
Note: If you don't have the value of the secret, you may be required to create another one as they are only displayed during the creation process. Follow the previous steps to create a new client secret for your app.
Tenant ID
Go to Azure Active Directory
Under the Overview header ****click on the Properties tab
The Tenant ID should appear on the page
Subscription ID
Go to the Azure portal
On the homepage click on Subscriptions under the Azure services pane
Click on the subscription of your choice
On the Overview page, you will find the subscription ID value next to the Subscription ID
Connection string
Go to the Azure portal
On the homepage click on Storage accounts under the Azure services pane
Click on the subscription that holds the storage account of your choice
Under the Security + networking pane, click on Access keys
Copy one of the connection strings you find
Resource group
Go to the Azure portal
On the homepage click on Resource groups under the Azure services pane
Click on the subscription that holds the resource group of your choice
The resource group name should be on the top left-hand side of the page over the Resource group sub-header
Note: If you have Databricks Runtime 8.1, you can authenticate Autoloader’s file notification mode with service principal credentials (client ID and client secret) only. Any other runtime lower than DBR 8.1 will require a connection string with the service principal to do the same. See Microsoft’s official documentation for more info.
Steps
1. Create an Azure AD app
Go to the Azure portal
Go to Azure Active Directory
Under the Manage header on the lefthand pane, click on App registrations, then click on New registration
Enter a name for the Azure AD app
In the Redirect URI section, select Web as the type then enter “localhost” as the URL.
Click Register
2. Grant the storage account access to create Storage queue messages
The storage account needs to be assigned the following roles:
Contributor
- to create queues and event subscriptionsStorage Queue Data Contributor
- to create, read and delete queues and queue messages
Here’s how to do it:
Go to Storage accounts and click on the storage account you want to grant access to
Click on Access control (IAM), then click on + Add button
Select Add role assignment
Select the Contributor and Storage Queue Data Contributor roles
Click Next
Click on +Select members and search for the Azure AD app you created in the previous steps
Click Select
Click Review + assign, then click on Review + assign for the next page
3. Grant the resource group access to subscribing to Event Grid topics
The resource group needs to be assigned the following role:
EventGrid EventSubscription Contributor
- to manage multiple Event Grid subscription operations
Here’s how to do it:
Go to Resource groups and click on the resource group you want to grant access to
Click on Access control (IAM), then click on + Add button
Select Add role assignment
Select the EventGrid EventSubscription Contributor role
Click Next
Click on +Select members and search for the Azure AD app you created in the previous steps
Click Select
Click Review + assign, then click on Review + assign for the next page
4. Create Databricks secret scope & secrets
Instructions for this step can be found in this blog post here.
5. Set up configuration code in Databricks
Here is the configuration code:
autoloader_config = {
"cloudFiles.format":"csv",
"cloudFiles.clientId": client_id,
"cloudFiles.clientSecret": client_secret,
"cloudFiles.tenantId": tenant_id,
"cloudFiles.subscriptionId": subscription_id,
"cloudFiles.connectionString": connection_string,
"clientFiles.resourceGroup": resource_group,
"cloudFiles.schemaLocation":schema_location,
"clientFiles.useNotifications": "true",
"header": True
}
Other checks
Make sure your subscription has Microsoft.EventGrid
as a resource provider. You check by following these steps:
Go to the Azure portal
Go to Subscriptions and select your subscription
Under the Settings pane on the left-hand menu click Resource providers
Search for
Microsoft.EventGrid
and check the status under the Status columnIf it shows Not registered, click Register. After a few minutes, the status should change to Registered
Test to see file notifications work by setting up your streaming dataframe and checking the numRows in the details of this:
streaming_df = (spark
.readStream
.format("cloudFiles")
.options(**autoloader_config)
.load(source_directory)
)
If you’re experiencing difficulty with the above, try explicitly reading the configuration objects like so:
streaming_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.clientId", client_id)
.option("cloudFiles.clientSecret", client_secret)
.option("cloudFiles.tenantId", tenant_id)
.option("cloudFiles.subscriptionId", subscription_id)
.option("cloudFiles.connectionString", connection_string)
.option("cloudFiles.resourceGroup", resource_group)
.option("cloudFiles.useNotifications", False)
.option("cloudFiles.validateOptions", False)
.option("header", True)
.schema(streaming_df_schema)
.load(source_directory)
)
Feel free to reach out via my handles: LinkedIn| Email | Twitter