import Header from '../_source-info-header.md';
This is a dlt source you can use to extract data from any REST API. It uses declarative configuration to define the API endpoints, their relationships, how to handle pagination, and authentication.
Quick example#
Here's an example of how to configure the REST API source to load posts and related comments from a hypothetical blog API:
import dlt
from dlt.sources.rest_api import rest_api_source
source = rest_api_source({
"client": {
"base_url": "https://api.example.com/",
"auth": {
"token": dlt.secrets["your_api_token"],
},
"paginator": {
"type": "json_link",
"next_url_path": "paging.next",
},
},
"resources": [
# "posts" will be used as the endpoint path, the resource name,
# and the table name in the destination. The HTTP client will send
# a request to "https://api.example.com/posts".
"posts",
# The explicit configuration allows you to link resources
# and define query string parameters.
{
"name": "comments",
"endpoint": {
"path": "posts/{resources.posts.id}/comments",
"params": {
"sort": "created_at",
},
},
},
],
})
pipeline = dlt.pipeline(
pipeline_name="rest_api_example",
destination="duckdb",
dataset_name="rest_api_data",
)
load_info = pipeline.run(source)
Running this pipeline will create two tables in DuckDB: posts and comments with the data from the respective API endpoints. The comments resource will fetch comments for each post by using the id field from the posts resource.
Setup#
Prerequisites#
Please make sure the dlt library is installed. Refer to the installation guide.
Initialize the REST API source#
Enter the following command in your terminal:
dlt init rest_api duckdb
dlt init will initialize the pipeline examples for REST API as the source and duckdb as the destination.
Running dlt init creates the following in the current folder:
rest_api_pipeline.pyfile with a sample pipelines definition:- GitHub API example
- Pokemon API example
.dltfolder with:secrets.tomlfile to store your access tokens and other sensitive informationconfig.tomlfile to store the configuration settings
requirements.txtfile with the required dependencies
Change the REST API source to your needs by modifying the rest_api_pipeline.py file. See the detailed source configuration section below.
This source is based on the RESTClient class.
Add credentials#
In the .dlt folder, you'll find a file called secrets.toml, where you can securely store your access tokens and other sensitive information. It's important to handle this file with care and keep it safe.
The GitHub API requires an access token to access some of its endpoints and to increase the rate limit for the API calls. To get a GitHub token, follow the GitHub documentation on managing your personal access tokens.
After you get the token, add it to the secrets.toml file:
[sources.rest_api_pipeline.github]
github_token = "your_github_token"
Run the pipeline#
-
Install the required dependencies by running the following command:
pip install -r requirements.txt -
Run the pipeline:
python rest_api_pipeline.py -
Verify that everything loaded correctly by using the following command:
dlt pipeline rest_api show
Source configuration#
Quick example#
Let's take a look at the GitHub example in the rest_api_pipeline.py file:
from dlt.sources.rest_api import RESTAPIConfig, rest_api_resources
@dlt.source
def github_source(github_token=dlt.secrets.value):
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {
"token": github_token,
},
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"params": {
"per_page": 100,
},
},
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
"params": {
"sort": "updated",
"direction": "desc",
"state": "open",
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "2024-01-25T11:21:28Z",
},
},
},
},
{
"name": "issue_comments",
"endpoint": {
"path": "issues/{resources.issues.number}/comments",
},
"include_from_parent": ["id"],
},
],
}
yield from rest_api_resources(config)
def load_github() -> None:
pipeline = dlt.pipeline(
pipeline_name="rest_api_github",
destination="duckdb",
dataset_name="rest_api_data",
)
load_info = pipeline.run(github_source())
print(load_info)
The declarative resource configuration is defined in the config dictionary. It contains the following key components:
-
client: Defines the base URL and authentication method for the API. In this case, it uses token-based authentication. The token is stored in thesecrets.tomlfile. -
resource_defaults: Contains default settings for all resources. In this example, we define that all resources:- Have
idas the primary key - Use the
mergewrite disposition to merge the data with the existing data in the destination. - Send a
per_page=100query parameter with each request to get more results per page.
- Have
-
resources: A list of resources to be loaded. Here, we have two resources:issuesandissue_comments, which correspond to the GitHub API endpoints for repository issues and issue comments. Note that we need an issue number to fetch comments for each issue. This number is taken from theissuesresource. More on this in the resource relationships section.
Let's break down the configuration in more detail.
Configuration structure#
The configuration object passed to the REST API Generic Source has three main elements:
config: RESTAPIConfig = {
"client": {
# ...
},
"resource_defaults": {
# ...
},
"resources": [
# ...
],
}
client#
The client configuration is used to connect to the API's endpoints. It includes the following fields:
base_url(str): The base URL of the API. This string is prepended to all endpoint paths. For example, if the base URL ishttps://api.example.com/v1/, and the endpoint path isusers, the full URL will behttps://api.example.com/v1/users.headers(dict, optional): Additional headers that are sent with each request. See the headers configuration section for more details.auth(optional): Authentication configuration. This can be a simple token, anAuthConfigBaseobject, or a more complex authentication method.session(requests.Session, optional): A custom session object. When provided, this session will be used for all HTTP requests instead of the default session. Can be used, for example, with requests-oauthlib for OAuth authentication.paginator(optional): Configuration for the default pagination used for resources that support pagination. Refer to the pagination section for more details.session(optional): Customrequestssession to setup custom timeouts and retry strategies.
resource_defaults (optional)#
resource_defaults contains the default values to configure the dlt resources. This configuration is applied to all resources unless overridden by the resource-specific configuration.
For example, you can set the primary key, write disposition, and other default settings here:
config = {
"client": {
# ...
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"params": {
"per_page": 100,
},
},
},
"resources": [
"resource1",
{
"name": "resource2_name",
"write_disposition": "append",
"endpoint": {
"params": {
"param1": "value1",
},
},
}
],
}
Above, all resources will have primary_key set to id, resource1 will have write_disposition set to merge, and resource2 will override the default write_disposition with append.
Both resource1 and resource2 will have the per_page parameter set to 100.
resources#
This is a list of resource configurations that define the API endpoints to be loaded. Each resource configuration can be:
- a dictionary with the resource configuration.
- a string. In this case, the string is used as both the endpoint path and the resource name, and the resource configuration is taken from the
resource_defaultsconfiguration if it exists.
Resource configuration#
A resource configuration is used to define a dlt resource for the data to be loaded from an API endpoint. When defining the resource you may specify:
-
dlt resource parameters, for example:
name: The name of the resource. This is also used as the table name in the destination unless overridden by thetable_nameparameter.write_disposition: The write disposition for the resource.primary_key: The primary key for the resource.table_name: Override the table name for this resource.max_table_nesting: Sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.selected: A flag to indicate if the resource is selected for loading. This could be useful when you want to load data only from child resources and not from the parent resource.
see dlt resource API reference for more details.
-
rest_apispecific parameters, such as:endpoint: The endpoint configuration for the resource. It can be a string or a dict representing the endpoint settings. See the endpoint configuration section for more details.include_from_parent: A list of fields from the parent resource to be included in the resource output. See the resource relationships section for more details.processing_steps: A list of processing steps to filter and transform your data.auth: An optionalAuthConfiginstance. If passed, is used over the one defined in the client definition.
Example:
from dlt.sources.helpers.rest_client.auth import HttpBasicAuth
config = {
"client": {
"auth": {
"type": "bearer",
"token": dlt.secrets["your_api_token"],
}
},
"resources": [
"resource-using-bearer-auth",
{
"name": "my-resource-with-special-auth",
"write_disposition": "merge",
"table_name": "my_custom_table",
"endpoint": {
# ...
"auth": HttpBasicAuth("user", dlt.secrets["your_basic_auth_password"])
},
# ...
}
]
# ...
}
This would use Bearer auth as defined in the client for resource-using-bearer-auth and Http Basic auth for my-resource-with-special-auth.
Endpoint configuration#
The endpoint configuration defines how to query the API endpoint. Quick example:
{
"path": "issues",
"method": "GET",
"params": {
"sort": "updated",
"direction": "desc",
"state": "open",
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "2024-01-25T11:21:28Z",
},
},
"data_selector": "results",
}
The fields in the endpoint configuration are:
path: The path to the API endpoint. By default this path is appended to the givenbase_url. If this is a fully qualified URL starting withhttp:orhttps:it will be
used as-is andbase_urlwill be ignored.method: The HTTP method to be used. The default isGET.headers: Additional headers specific to this endpoint. See the headers configuration section for more details.params: Query parameters to be sent with each request. For example,sortto order the results orsinceto specify incremental loading. This is also may be used to define resource relationships.json: The JSON payload to be sent with the request (for POST and PUT requests).data: The data payload to be sent with the request body. Can be a dictionary (form-encoded) or string. Mutually exclusive withjsonparameter. Use this for APIs that expect form-encoded data or raw payloads instead of JSON.paginator: Pagination configuration for the endpoint. See the pagination section for more details.data_selector: A JSONPath to select the data from the response. See the data selection section for more details.response_actions: A list of actions that define how to process the response data. See the response actions section for more details.incremental: Configuration for incremental loading.
Pagination#
The REST API source will try to automatically handle pagination for you. This works by detecting the pagination details from the first API response.
In some special cases, you may need to specify the pagination configuration explicitly.
To specify the pagination configuration, use the paginator field in the client, resource_defaults, or endpoint configurations (see pagination configuration hierarchy). You may either use a dictionary with a string alias in the type field along with the required parameters, or use a paginator class instance.
Example#
Suppose the API response for https://api.example.com/posts contains a next field with the URL to the next page:
{
"data": [
{"id": 1, "title": "Post 1"},
{"id": 2, "title": "Post 2"},
{"id": 3, "title": "Post 3"}
],
"pagination": {
"next": "https://api.example.com/posts?page=2"
}
}
You can configure the pagination for the posts resource like this:
{
"path": "posts",
"paginator": {
"type": "json_link",
"next_url_path": "pagination.next",
}
}
Alternatively, you can use the paginator instance directly:
from dlt.sources.helpers.rest_client.paginators import JSONLinkPaginator
# ...
{
"path": "posts",
"paginator": JSONLinkPaginator(
next_url_path="pagination.next"
),
}
:::tip JSONPath escaping for special characters
When working with APIs that use field names containing special characters (like dots, @ symbols, or other reserved JSONPath characters), you need to escape the field names using bracket notation.
For example, Microsoft Graph API uses @odata.nextLink for pagination. To access this field, use bracket notation with quotes:
{
"path": "users",
"paginator": {
"type": "json_link",
"next_url_path": "['@odata.nextLink']", # Escaped using bracket notation
}
}
Refer to the JSONPath syntax for more details.
:::
These are the available paginators:
type | Paginator class | Description |
|---|---|---|
json_link | JSONLinkPaginator | The link to the next page is in the body (JSON) of the response. Parameters:
|
header_link | HeaderLinkPaginator | The links to the next page are in the response headers. Parameters:
|
header_cursor | HeaderCursorPaginator | The cursor for the next page is in the response headers. Parameters:
|
offset | OffsetPaginator | The pagination is based on an offset parameter, with the total items count either in the response body or explicitly provided. Parameters:
|
page_number | PageNumberPaginator | The pagination is based on a page number parameter, with the total pages count either in the response body or explicitly provided. Parameters:
|
cursor | JSONResponseCursorPaginator | The pagination is based on a cursor parameter, with the value of the cursor in the response body (JSON). Parameters:
cursor_param or cursor_body_path, but not both. If neither is provided, cursor_param will default to "cursor". |
single_page | SinglePagePaginator | The response will be interpreted as a single-page response, ignoring possible pagination metadata. |
auto | None | Explicitly specify that the source should automatically detect the pagination method. |
Pagination configuration hierarchy#
Paginators are applied in the following order of precedence:
- Endpoint-level paginator: defined in individual resource endpoint configurations.
- Resource defaults paginator: defined in
resource_defaults.endpoint.paginator - Client-level paginator: defined in the client configuration. This is the lowest priority.
Single entity endpoint detection#
The REST API source tries to automatically detect endpoints that return single entities (e.g. not paginated lists of items). It works by looking if the path contains any placeholders with references to other resources (for example users/{resources.users.id}/ or user/{id}) and applies special handling:
- If no
paginatoris explicitly configured at endpoint or resource defaults level, these endpoints automatically useSinglePagePaginator. - If no
data_selectoris explicitly configured, these endpoints automatically use"$"to select the entire response.
Custom paginators#
For more complex pagination methods, you can implement a custom paginator, instantiate it, and use it in the configuration.
Alternatively, you can use the dictionary configuration syntax also for custom paginators. For this, you need to register your custom paginator:
from dlt.sources.rest_api.config_setup import register_paginator
class CustomPaginator(SinglePagePaginator):
# custom implementation of SinglePagePaginator
pass
register_paginator("custom_paginator", CustomPaginator)
{
# ...
"paginator": {
"type": "custom_paginator",
"next_url_path": "paging.nextLink",
}
}
Data selection#
The data_selector field in the endpoint configuration allows you to specify a JSONPath to select the data from the response. By default, the source will try to detect the locations of the data automatically.
Use this field when you need to specify the location of the data in the response explicitly.
For example, if the API response looks like this:
{
"posts": [
{"id": 1, "title": "Post 1"},
{"id": 2, "title": "Post 2"},
{"id": 3, "title": "Post 3"}
]
}
You can use the following endpoint configuration:
{
"path": "posts",
"data_selector": "posts",
}
For a nested structure like this:
{
"results": {
"posts": [
{"id": 1, "title": "Post 1"},
{"id": 2, "title": "Post 2"},
{"id": 3, "title": "Post 3"}
]
}
}
You can use the following endpoint configuration:
{
"path": "posts",
"data_selector": "results.posts",
}
Read more about JSONPath syntax to learn how to write selectors.
Authentication#
For APIs that require authentication to access their endpoints, the REST API source supports various authentication methods, including token-based authentication, query parameters, basic authentication, and custom authentication. The authentication configuration is specified in the auth field of the client either as a dictionary or as an instance of the authentication class.
Quick example#
Here's how to configure authentication using a bearer token:
{
"client": {
# ...
"auth": {
"type": "bearer",
"token": dlt.secrets["your_api_token"],
},
# ...
},
}
Alternatively, you can use the authentication class directly:
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
config = {
"client": {
"auth": BearerTokenAuth(dlt.secrets["your_api_token"]),
},
"resources": [
]
# ...
}
Since token-based authentication is one of the most common methods, you can use the following shortcut:
{
"client": {
# ...
"auth": {
"token": dlt.secrets["your_api_token"],
},
# ...
},
}
Available authentication types:
type | Authentication class | Description |
|---|---|---|
bearer | BearerTokenAuth | Bearer token authentication. Parameters:
|
http_basic | HTTPBasicAuth | Basic HTTP authentication. Parameters:
|
api_key | APIKeyAuth | API key authentication with key defined in the query parameters or in the headers. Parameters:
|
oauth2_client_credentials | OAuth2ClientCredentials | OAuth 2.0 Client Credentials authorization for server-to-server communication without user consent. Parameters:
|
For more complex authentication methods, you can implement a custom authentication class and use it in the configuration.
You can use the dictionary configuration syntax also for custom authentication classes after registering them as follows:
from dlt.common.configuration import configspec
from dlt.sources.rest_api.config_setup import register_auth
@configspec
class CustomAuth(AuthConfigBase):
pass
register_auth("custom_auth", CustomAuth)
{
# ...
"auth": {
"type": "custom_auth",
"api_key": dlt.secrets["sources.my_source.my_api_key"],
}
}
Define resource relationships#
When you have a resource that depends on another resource (for example, you must fetch a parent resource to get an ID needed to fetch the child), you can reference fields in the parent resource using special placeholders.
This allows you to link one or more path, query string or JSON body parameters in the child resource to fields in the parent resource's data.
Via request path#
In the GitHub example, the issue_comments resource depends on the issues resource. The resources.issues.number placeholder links the number field in the issues resource data to the current request's path parameter.
{
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
# ...
},
},
{
"name": "issue_comments",
"endpoint": {
"path": "issues/{resources.issues.number}/comments",
},
"include_from_parent": ["id"],
},
],
}
This configuration tells the source to get issue numbers from the issues resource data and use them to fetch comments for each issue number. So for each issue item, "{resources.issues.number}" is replaced by the issue number in the request path.
For example, if the issues resource yields the following data:
[
{"id": 1, "number": 123},
{"id": 2, "number": 124},
{"id": 3, "number": 125}
]
The issue_comments resource will make requests to the following endpoints:
issues/123/commentsissues/124/commentsissues/125/comments
The syntax for the placeholder is resources.<parent_resource_name>.<field_name>.
Via query string parameters#
The placeholder syntax can also be used in the query string parameters. For example, in an API which lets you fetch a blog posts (via /posts) and their comments (via /comments?post_id=<post_id>), you can define a resource posts and a resource post_comments which depends on the posts resource. You can then reference the id field from the posts resource in the post_comments resource:
{
"resources": [
"posts",
{
"name": "post_comments",
"endpoint": {
"path": "comments",
"params": {
"post_id": "{resources.posts.id}",
},
},
},
],
}
Similar to the GitHub example above, if the posts resource yields the following data:
[
{"id": 1, "title": "Post 1"},
{"id": 2, "title": "Post 2"},
{"id": 3, "title": "Post 3"}
]
The post_comments resource will make requests to the following endpoints:
comments?post_id=1comments?post_id=2comments?post_id=3
Via JSON body#
In many APIs, you can send a complex query or configuration through a POST request's JSON body rather than in the request path or query parameters. For example, consider an imaginary /search endpoint that supports multiple filters and settings. You might have a parent resource posts with each post's id and a second resource, post_details, that uses id to perform a custom search.
In the example below we reference the posts resource's id field in the JSON body via placeholders:
{
"resources": [
"posts",
{
"name": "post_details",
"endpoint": {
"path": "search",
"method": "POST",
"json": {
"filters": {
"id": "{resources.posts.id}",
},
"order": "desc",
"limit": 5,
}
},
},
],
}
:::tip Escaping curly braces
When your API requires literal curly braces in parameters (e.g., for JSON filters or GraphQL queries), escape them by doubling: {{ and }}.
Example with GraphQL query:
{
"json": {
"query": """
query Artist {{
artist(id: "{resources.artist_list.id}") {{
id
name
}}
}}
"""
}
}
Use the same technique for GET request query string parameters:
{
"params": {
"search": "{{'filters': [{{'id': 42}}]}}"
}
}
:::
Legacy syntax: resolve field in parameter configuration#
An alternative, legacy way to define resource relationships is to use the resolve field in the parameter configuration.
Here's the same example as above that uses the resolve field:
{
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
# ...
},
},
{
"name": "issue_comments",
"endpoint": {
"path": "issues/{issue_number}/comments",
"params": {
"issue_number": {
"type": "resolve",
"resource": "issues",
"field": "number",
}
},
},
"include_from_parent": ["id"],
},
],
}
The syntax for the resolve field in parameter configuration is:
{
"<parameter_name>": {
"type": "resolve",
"resource": "<parent_resource_name>",
"field": "<parent_resource_field_name_or_jsonpath>",
}
}
The field value can be specified as a JSONPath to select a nested field in the parent resource data. For example: "field": "items[0].id".
Resolving multiple path parameters from a parent resource#
When a child resource depends on multiple fields from a single parent resource, you can define multiple resolve parameters in the endpoint configuration. For example:
{
"resources": [
"groups",
{
"name": "users",
"endpoint": {
"path": "groups/{group_id}/users",
"params": {
"group_id": {
"type": "resolve",
"resource": "groups",
"field": "id",
},
},
},
},
{
"name": "user_details",
"endpoint": {
"path": "groups/{group_id}/users/{user_id}/details",
"params": {
"group_id": {
"type": "resolve",
"resource": "users",
"field": "group_id",
},
"user_id": {
"type": "resolve",
"resource": "users",
"field": "id",
},
},
},
},
],
}
In the configuration above:
- The
usersresource depends on thegroupsresource, resolving thegroup_idparameter from theidfield ingroups. - The
user_detailsresource depends on theusersresource, resolving bothgroup_idanduser_idparameters from fields inusers.
Include fields from the parent resource#
You can include data from the parent resource in the child resource by using the include_from_parent field in the resource configuration. For example:
{
"name": "issue_comments",
"endpoint": {
...
},
"include_from_parent": ["id", "title", "created_at"],
}
This will include the id, title, and created_at fields from the issues resource in the issue_comments resource data. The names of the included fields will be prefixed with the parent resource name and an underscore (_) like so: _issues_id, _issues_title, _issues_created_at.
Parallelize dependent resource fetching#
By default, dependent resources fetch child data for each parent item sequentially. You can set parallelized to True on a dependent resource to fetch child data concurrently using dlt's thread pool:
{
"name": "issue_comments",
"parallelized": True,
"endpoint": {
"path": "issues/{resources.issues.number}/comments",
},
"include_from_parent": ["id"],
}
See Per-resource parallelization for more details.
Define a resource which is not a REST endpoint#
Sometimes, we want to request endpoints with specific values that are not returned by another endpoint.
Thus, you can also include arbitrary dlt resources in your RESTAPIConfig instead of defining a resource for every path!
In the following example, we want to load the issues belonging to three repositories.
Instead of defining three different issues resources, one for each of the paths dlt-hub/dlt/issues/, dlt-hub/verified-sources/issues/, dlt-hub/dlthub-education/issues/, we have a resource repositories which yields a list of repository names that will be fetched by the dependent resource issues.
from dlt.sources.rest_api import RESTAPIConfig
@dlt.resource()
def repositories() -> Generator[List[Dict[str, Any]], Any, Any]:
"""A seed list of repositories to fetch"""
yield [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}]
config: RESTAPIConfig = {
"client": {"base_url": "https://github.com/api/v2"},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "dlt-hub/{repository}/issues/",
"params": {
"repository": {
"type": "resolve",
"resource": "repositories",
"field": "name",
},
},
},
},
repositories(),
],
}
Be careful that the parent resource needs to return Generator[List[Dict[str, Any]]]. Thus, the following will NOT work:
@dlt.resource
def repositories() -> Generator[Dict[str, Any], Any, Any]:
"""Not working seed list of repositories to fetch"""
yield from [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}]
Processing steps: filter and transform data#
The processing_steps field in the resource configuration allows you to apply transformations to the data fetched from the API before it is loaded into your destination. This is useful when you need to filter out certain records, modify the data structure, or anonymize sensitive information.
Each processing step is a dictionary specifying the type of operation (filter, map or yield_map) and the function to apply. Steps apply in the order they are listed.
Quick example#
def lower_title(record):
record["title"] = record["title"].lower()
return record
def flatten_reactions(post):
post_without_reactions = copy.deepcopy(post)
post_without_reactions.pop("reactions")
for reaction in post["reactions"]:
yield {"reaction": reaction, **post_without_reactions}
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "posts",
"processing_steps": [
{"filter": lambda x: x["id"] < 10},
{"map": lower_title},
{"yield_map": flatten_reactions},
],
},
],
}
In the example above:
- First, the
filterstep uses a lambda function to include only records whereidis less than 10. - Then, the
mapstep applies thelower_titlefunction to each remaining record. - Finally, the
yield_mapstep applies theflatten_reactionsfunction to each transformed record,
yielding a set of records, one for each reaction for the given post.
Using filter#
The filter step allows you to exclude records that do not meet certain criteria. The provided function should return True to keep the record or False to exclude it:
{
"name": "posts",
"endpoint": "posts",
"processing_steps": [
{"filter": lambda x: x["id"] in [10, 20, 30]},
],
}
In this example, only records with id equal to 10, 20, or 30 will be included.
Using map#
The map step allows you to modify the records fetched from the API. The provided function should take a record as an argument and return the modified record. For example, to anonymize the email field:
def anonymize_email(record):
record["email"] = "REDACTED"
return record
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "users",
"processing_steps": [
{"map": anonymize_email},
],
},
],
}
Using yield_map#
The yield_map step allows you to transform a record into multiple records. The provided function should take a record as an argument and return an iterator of records. For example, to flatten the reactions field:
def flatten_reactions(post):
post_without_reactions = copy.deepcopy(post)
post_without_reactions.pop("reactions")
for reaction in post["reactions"]:
yield {"reaction": reaction, **post_without_reactions}
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "posts",
"processing_steps": [
{"yield_map": flatten_reactions},
],
},
],
}
Combining filter and map#
You can combine multiple processing steps to achieve complex transformations:
{
"name": "posts",
"endpoint": "posts",
"processing_steps": [
{"filter": lambda x: x["id"] < 10},
{"map": lower_title},
{"filter": lambda x: "important" in x["title"]},
],
}
Incremental loading#
Some APIs provide a way to fetch only new or changed data (most often by using a timestamp field like updated_at, created_at, or incremental IDs).
This is called incremental loading and is very useful as it allows you to reduce the load time and the amount of data transferred.
Let's continue with our imaginary blog API example to understand incremental loading with query parameters.
Imagine we have the following endpoint https://api.example.com/posts and it:
- Accepts a
created_sincequery parameter to fetch blog posts created after a certain date. - Returns a list of posts with the
created_atfield for each post.
For example, if we query the endpoint with GET request https://api.example.com/posts?created_since=2024-01-25, we get the following response:
{
"results": [
{"id": 1, "title": "Post 1", "created_at": "2024-01-26"},
{"id": 2, "title": "Post 2", "created_at": "2024-01-27"},
{"id": 3, "title": "Post 3", "created_at": "2024-01-28"}
]
}
When the API endpoint supports incremental loading, you can configure dlt to load only the new or changed data using these three methods:
- Using placeholders for incremental loading
- Defining a special parameter in the
paramssection of the endpoint configuration (DEPRECATED) - Using the
incrementalfield in the endpoint configuration with thestart_paramfield (DEPRECATED)
Using placeholders for incremental loading#
The most flexible way to configure incremental loading is to use placeholders in the request configuration along with the incremental section.
Here's how it works:
- Define the
incrementalsection in the endpoint configuration to specify the cursor path (where to find the incremental value in the response) and initial value (the value to start the incremental loading from). - Use the placeholder
{incremental.start_value}in the request configuration to reference the incremental value.
Let's take the example from the previous section and configure it using placeholders:
{
"path": "posts",
"data_selector": "results",
"params": {
"created_since": "{incremental.start_value}", # Uses cursor value in query parameter
},
"incremental": {
"cursor_path": "created_at",
"initial_value": "2024-01-25T00:00:00Z",
},
}
When you first run this pipeline, dlt will:
- Replace
{incremental.start_value}with2024-01-25T00:00:00Z(the initial value) - Make a GET request to
https://api.example.com/posts?created_since=2024-01-25T00:00:00Z - Parse the response (e.g., posts with created_at values like "2024-01-26", "2024-01-27", "2024-01-28")
- Track the maximum value found in the "created_at" field (in this case, "2024-01-28")
On the next pipeline run, dlt will:
- Replace
{incremental.start_value}with "2024-01-28" (the last seen maximum value) - Make a GET request to
https://api.example.com/posts?created_since=2024-01-28 - The API will only return posts created on or after January 28th
Let's break down the configuration:
- We explicitly set
data_selectorto"results"to select the list of posts from the response. This is optional; if not set, dlt will try to auto-detect the data location. - We define the
created_sinceparameter inparamssection and use the placeholder{incremental.start_value}to reference the incremental value.
Placeholders are versatile and can be used in various request components. Here are some examples:
In JSON body (for POST requests)#
If the API lets you filter the data by a range of dates (e.g. fromDate and toDate), you can use the placeholder in the JSON body:
{
"path": "posts/search",
"method": "POST",
"json": {
"filters": {
"fromDate": "{incremental.start_value}", # In JSON body
"toDate": "2024-03-25"
},
"limit": 1000
},
"incremental": {
"cursor_path": "created_at",
"initial_value": "2024-01-25T00:00:00Z",
},
}
In path parameters#
Some APIs use path parameters to filter the data:
{
"path": "posts/since/{incremental.start_value}/list", # In URL path
"incremental": {
"cursor_path": "created_at",
"initial_value": "2024-01-25",
},
}
In request headers#
You can also use placeholders in request headers:
{
"path": "posts",
"headers": {
"X-Since-Timestamp": "{incremental.start_value}" # In custom header
},
"incremental": {
"cursor_path": "created_at",
"initial_value": "2024-01-25T00:00:00Z",
},
}
For more details on headers configuration and dynamic placeholders, see the headers configuration section.
You can also use different placeholder variants depending on your needs:
| Placeholder | Description |
|---|---|
{incremental.start_value} | The value to use as the starting point for this request (either the initial value or the last tracked maximum value) |
{incremental.initial_value} | Always uses the initial value specified in the configuration |
{incremental.last_value} | The last seen value (same as start_value in most cases, see the incremental loading guide for more details) |
{incremental.end_value} | The end value if specified in the configuration |
Legacy method: Incremental loading in params (DEPRECATED)#
For query string parameters, you can also specify incremental loading directly in the params section:
{
"path": "posts",
"data_selector": "results", # Optional JSONPath to select the list of posts
"params": {
"created_since": {
"type": "incremental",
"cursor_path": "created_at", # The JSONPath to the field we want to track in each post
"initial_value": "2024-01-25",
},
},
}
Above we define the created_since parameter as an incremental parameter as:
{
"created_since": {
"type": "incremental",
"cursor_path": "created_at",
"initial_value": "2024-01-25",
},
}
The fields are:
type: The type of the parameter definition. In this case, it must be set toincremental.cursor_path: The JSONPath to the field within each item in the list. The value of this field will be used in the next request. In the example above, our items look like{"id": 1, "title": "Post 1", "created_at": "2024-01-26"}so to track the created time, we setcursor_pathto"created_at". Note that the JSONPath starts from the root of the item (dict) and not from the root of the response.initial_value: The initial value for the cursor. This is the value that will initialize the state of incremental loading. In this case, it's2024-01-25. The value type should match the type of the field in the data item.
Incremental loading using the incremental field (DEPRECATED)#
Another alternative method is to use the incremental field in the endpoint configuration while specifying names of the query string parameters to be used as start and end conditions.
Let's take the same example as above and configure it using the incremental field:
{
"path": "posts",
"data_selector": "results",
"incremental": {
"start_param": "created_since",
"cursor_path": "created_at",
"initial_value": "2024-01-25",
},
}
The full available configuration for the incremental field is:
{
"incremental": {
"start_param": "<start_parameter_name>",
"end_param": "<end_parameter_name>",
"cursor_path": "<path_to_cursor_field>",
"initial_value": "<initial_value>",
"end_value": "<end_value>",
"convert": my_callable,
}
}
The fields are:
start_param(str): The name of the query parameter to be used as the start condition. If we use the example above, it would be"created_since".end_param(str): The name of the query parameter to be used as the end condition. This is optional and can be omitted if you only need to track the start condition. This is useful when you need to fetch data within a specific range and the API supports end conditions (like thecreated_beforequery parameter).cursor_path(str): The JSONPath to the field within each item in the list. This is the field that will be used to track the incremental loading. In the example above, it's"created_at".initial_value(str): The initial value for the cursor. This is the value that will initialize the state of incremental loading.end_value(str): The end value for the cursor to stop the incremental loading. This is optional and can be omitted if you only need to track the start condition. If you set this field,initial_valueneeds to be set as well.convert(callable): A callable that converts the cursor value into the format that the query parameter requires. For example, a UNIX timestamp can be converted into an ISO 8601 date or a date can be converted intocreated_at+gt+{date}.
See the incremental loading guide for more details.
If you encounter issues with incremental loading, see the troubleshooting section in the incremental loading guide.
Convert the incremental value before calling the API#
If you need to transform the values in the cursor field before passing them to the API endpoint, you can specify a callable under the key convert. For example, the API might return UNIX epoch timestamps but expects to be queried with an ISO 8601 date. To achieve that, we can specify a function that converts from the date format returned by the API to the date format required for API requests.
In the following examples, 1704067200 is returned from the API in the field updated_at, but the API will be called with ?created_since=2024-01-01.
Incremental loading using the params field:
{
"created_since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "1704067200",
"convert": lambda epoch: pendulum.from_timestamp(int(epoch)).to_date_string(),
}
}
Incremental loading using the incremental field:
{
"path": "posts",
"data_selector": "results",
"incremental": {
"start_param": "created_since",
"cursor_path": "updated_at",
"initial_value": "1704067200",
"convert": lambda epoch: pendulum.from_timestamp(int(epoch)).to_date_string(),
},
}
Troubleshooting#
If you encounter issues while running the pipeline, enable logging for detailed information about the execution:
RUNTIME__LOG_LEVEL=INFO python my_script.py
This also provides details on the HTTP requests. If you want to see even more details, you can enable the HTTP error response bodies.
Viewing HTTP error response bodies#
By default, HTTP error response bodies are not included in error messages to keep logs concise. However, during development or debugging, you may want to see the full error response from the API.
To enable error response bodies in logs and exceptions in your config.toml file:
[runtime]
http_show_error_body = true
Or set via environment variables:
export RUNTIME__HTTP_SHOW_ERROR_BODY=true
Automatic secret redaction in logs#
The REST API source automatically redacts sensitive query parameters in URLs when logging or raising errors. The following parameter names are automatically considered sensitive and will be replaced with ***:
api_key,token,key,access_token,apikey,api-key,access-tokensecret,password,pwd,client_secretusername,client_id
Configuration issues#
Getting validation errors#
When you are running the pipeline and getting a DictValidationException, it means that the source configuration is incorrect. The error message provides details on the issue, including the path to the field and the expected type.
For example, if you have a source configuration like this:
config: RESTAPIConfig = {
"client": {
# ...
},
"resources": [
{
"name": "issues",
"params": { # <- Wrong: this should be inside
"sort": "updated", # the endpoint field below
},
"endpoint": {
"path": "issues",
# "params": { # <- Correct configuration
# "sort": "updated",
# },
},
},
# ...
],
}
You will get an error like this:
dlt.common.exceptions.DictValidationException: In path .: field 'resources[0]'
expects the following types: str, EndpointResource. Provided value {'name': 'issues', 'params': {'sort': 'updated'},
'endpoint': {'path': 'issues', ... }} with type 'dict' is invalid with the following errors:
For EndpointResource: In path ./resources[0]: following fields are unexpected {'params'}
It means that in the first resource configuration (resources[0]), the params field should be inside the endpoint field.
Getting wrong data or no data#
If incorrect data is received from an endpoint, check the data_selector field in the endpoint configuration. Ensure the JSONPath is accurate and points to the correct data in the response body. rest_api attempts to auto-detect the data location, which may not always succeed. See the data selection section for more details.
Getting insufficient data or incorrect pagination#
Check the paginator field in the configuration. When not explicitly specified, the source tries to auto-detect the pagination method. If auto-detection fails, or the system is unsure, a warning is logged. For production environments, we recommend specifying an explicit paginator in the configuration. See the pagination section for more details. Some APIs may have non-standard pagination methods, and you may need to implement a custom paginator.
Incremental loading not working#
See the troubleshooting guide for incremental loading issues.
Getting HTTP 404 errors#
Some APIs may return 404 errors for resources that do not exist or have no data. Manage these responses by configuring the ignore action in response actions.
Authentication issues#
If you are experiencing 401 (Unauthorized) errors, this could indicate:
- Incorrect authorization credentials. Verify credentials in the
secrets.toml. Refer to Secret and configs for more information. - An incorrect authentication type. Consult the API documentation for the proper method. See the authentication section for details. For some APIs, a custom authentication method may be required.
General guidelines#
The rest_api source uses the RESTClient class for HTTP requests. Refer to the RESTClient troubleshooting guide for debugging tips.
For further assistance, join our Slack community. We're here to help!