index
Type
External
Status
Published
Created
Mar 3, 2026
Updated
May 19, 2026
Updated by
Dosu Bot
Source
View

import Header from '../_source-info-header.md';

The filesystem source allows seamless loading of files from the following locations:

  • AWS S3
  • Google Cloud Storage
  • Google Drive
  • Azure Blob Storage
  • remote filesystem (via SFTP)
  • local filesystem
  • Public CDN

The filesystem source natively supports CSV, Parquet, and JSONL files and allows customization for loading any type of structured file.

Filesystem source allows loading files from remote locations (AWS S3, Google Cloud Storage, Google Drive, Azure Blob Storage, SFTP server) or the local filesystem seamlessly. Filesystem source natively supports CSV, Parquet, and JSONL files and allows customization for loading any type of structured files.

To load unstructured data (PDF, plain text, e-mail), please refer to the unstructured data source.

How filesystem source works#

The Filesystem source doesn't just give you an easy way to load data from both remote and local files — it also comes with a powerful set of tools that let you customize the loading process to fit your specific needs.

Filesystem source loads data in two steps:

  1. It accesses the files in your remote or local file storage without actually reading the content yet. At this point, you can filter files by metadata or name. You can also set up incremental loading to load only new files.
  2. The reader reads the files' content and yields the records. At this step, you can filter out the actual data, enrich records with metadata from files, or perform incremental loading based on the file content.

For the most common cases we provide readers source that does the above in a single step.

Quick example#

Let's see how to load a parquet file from a public website. The following example downloads a single file of yellow taxi trip records from the NYC Taxi & Limousine Commission website and loads it into DuckDB.

import datetime as dt

import dlt
from dlt.sources.filesystem import filesystem, read_parquet

filesystem_resource = filesystem(
  bucket_url="https://d37ci6vzurychx.cloudfront.net/trip-data",
  file_glob=f"yellow_tripdata_{(dt.datetime.now() - dt.timedelta(days=90)).strftime('%Y-%m')}.parquet",
)
filesystem_pipe = filesystem_resource | read_parquet()

# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("yellow_tripdata"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)

This section illustrates how to perform an efficient incremental load of Parquet files from a remote source, specifically an S3 bucket.

import dlt
from dlt.sources.filesystem import filesystem, read_parquet

filesystem_resource = filesystem(
  bucket_url="s3://my-bucket/files",
  file_glob="**/*.parquet",
  incremental=dlt.sources.incremental("modification_date")
)
filesystem_pipe = filesystem_resource | read_parquet()

# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("table_name"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)

With readers source:

import dlt
from dlt.sources.filesystem import readers

parquet_files = readers(
  bucket_url="s3://my-bucket/files",
  file_glob="**/*.parquet",
  incremental=dlt.sources.incremental("modification_date")
).read_parquet()

# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(parquet_files.with_name("table_name"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)

Setup#

Prerequisites#

Please make sure the dlt library is installed. Refer to the installation guide.

Initialize the filesystem source#

To get started with your data pipeline, follow these steps:

  1. Enter the following command:

    dlt init filesystem duckdb
    

    The dlt init command will initialize
    the pipeline example
    with the filesystem as the source and duckdb as the destination.

  2. If you would like to use a different destination, simply replace duckdb with the name of your
    preferred destination.

  3. After running this command, a new directory will be created with the necessary files and
    configuration settings to get started.

Configuration#

Get credentials#

<Tabs
groupId="filesystem-type"
defaultValue="aws"
values={[
{"label": "AWS S3", "value": "aws"},
{"label": "GCS/GDrive", "value": "gcp"},
{"label": "Azure", "value": "azure"},
{"label": "SFTP", "value": "sftp"},
{"label": "Local filesystem", "value": "local"},
]}>

To get AWS keys for S3 access:

  1. Access IAM in the AWS Console.
  2. Select "Users", choose a user, and open "Security credentials".
  3. Click "Create access key" for AWS ID and Secret Key.

For more info, see
AWS official documentation.

To get GCS/GDrive access:

  1. Log in to console.cloud.google.com.
  2. Create a service account.
  3. Enable "Cloud Storage API" / "Google Drive API"; see
    Google's guide.
  4. In IAM & Admin > Service Accounts, find your account, click the three-dot menu > "Manage Keys" >
    "ADD KEY" > "CREATE" to get a JSON credential file.
  5. Grant the service account appropriate permissions for cloud storage access.
  6. In the case of GDrive, share the respective folders/files with the service account.

For more info, see how to
create a service account.

To obtain Azure blob storage access:

  1. Go to the Azure Portal (portal.azure.com).
  2. Select "Storage accounts" > your storage.
  3. Click "Settings" > "Access keys".
  4. View the account name and two keys (primary/secondary). Keep keys confidential.

For more info, see
Azure official documentation.

dlt supports several authentication methods:

  1. Key-based authentication
  2. SSH Agent-based authentication
  3. Username/Password authentication
  4. GSS-API authentication

Learn more about SFTP authentication options in the SFTP section. To obtain credentials, contact your server administrator.

You don't need any credentials for the local filesystem.

Add credentials to dlt pipeline#

To provide credentials to the filesystem source, you can use any method available in dlt.
One of the easiest ways is to use configuration files. The .dlt folder in your working directory contains two files: config.toml and secrets.toml. Sensitive information, like passwords and access tokens, should only be put into secrets.toml, while any other configuration, like the path to a bucket, can be specified in config.toml.

<Tabs
groupId="filesystem-type"
defaultValue="aws"
values={[
{"label": "AWS S3", "value": "aws"},
{"label": "GCS/GDrive", "value": "gcp"},
{"label": "Azure", "value": "azure"},
{"label": "SFTP", "value": "sftp"},
{"label": "Local filesystem", "value": "local"},
]}>

# secrets.toml
[sources.filesystem.credentials]
aws_access_key_id="Please set me up!"
aws_secret_access_key="Please set me up!"

# config.toml
[sources.filesystem]
bucket_url="s3://<bucket_name>/<path_to_files>/"
# secrets.toml
[sources.filesystem.credentials]
azure_storage_account_name="Please set me up!"
azure_storage_account_key="Please set me up!"

# config.toml
[sources.filesystem]
bucket_url="az://<container_name>/<path_to_files>/"
# secrets.toml
[sources.filesystem.credentials]
client_email="Please set me up!"
private_key="Please set me up!"
project_id="Please set me up!"

# config.toml
# gdrive
[gdrive_pipeline_name.sources.filesystem]
bucket_url="gdrive://<folder_name>/<subfolder_or_file_path>/" # set file_glob="" if file path

# config.toml
# Google storage
[gstorage_pipeline_name.sources.filesystem]
bucket_url="gs://<bucket_name>/<path_to_files>/"

Learn how to set up SFTP credentials for each authentication method in the SFTP section.
For example, in the case of key-based authentication, you can configure the source the following way:

# secrets.toml
[sources.filesystem.credentials]
sftp_username = "foo"
sftp_key_filename = "/path/to/id_rsa" # Replace with the path to your private key file
sftp_key_passphrase = "your_passphrase" # Optional: passphrase for your private key

# config.toml
[sources.filesystem]
bucket_url = "sftp://[hostname]/[path]"

You can use both native local filesystem paths and the file:// URI. Absolute, relative, and UNC Windows paths are supported.

You could provide an absolute filepath:

# config.toml
[sources.filesystem]
bucket_url='file://Users/admin/Documents/csv_files'

Or skip the schema and provide the local path in a format native to your operating system. For example, for Windows:

[sources.filesystem]
bucket_url='~\Documents\csv_files\'

You can also specify the credentials using environment variables. The name of the corresponding environment variable should be slightly different from the corresponding name in the TOML file. Simply replace dots . with double underscores __:

export SOURCES__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID = "Please set me up!"
export SOURCES__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY = "Please set me up!"

Usage#

The filesystem source is quite unique since it provides you with building blocks for loading data from files. First, it iterates over files in the storage and then processes each file to yield the records. Usually, you need two resources:

  1. The filesystem resource enumerates files in a selected bucket using a glob pattern, returning details as FileItem in customizable page sizes.
  2. One of the available transformer resources to process each file in a specific transforming function and yield the records.

1. Initialize a filesystem resource#

All parameters of the resource can be specified directly in code:

from dlt.sources.filesystem import filesystem

filesystem_source = filesystem(
  bucket_url="file://Users/admin/Documents/csv_files",
  file_glob="*.csv"
)

or taken from the config:

  • python code:

    from dlt.sources.filesystem import filesystem
    
    filesystem_source = filesystem()
    
  • configuration file:

    [sources.filesystem]
    bucket_url="file://Users/admin/Documents/csv_files"
    file_glob="*.csv"
    

Full list of filesystem resource parameters:

  • bucket_url - full URL of the bucket (could be a relative path in the case of the local filesystem).

  • credentials - cloud storage credentials of AbstractFilesystem instance (should be empty for the local filesystem). We recommend not specifying this parameter in the code, but putting it in a secrets file instead.

  • file_glob - file filter in glob format. Defaults to listing all non-recursive files in the bucket URL.

  • files_per_page - number of files processed at once. The default value is 100.

  • extract_content - if true, the content of the file will be read and returned in the resource. The default value is False.

2. Choose the right reader#

The current implementation of the filesystem source natively supports three file types: CSV, Parquet, and JSONL.
You can apply any of the above or create your own readers. To apply the selected transformer resource, use pipe notation |:

from dlt.sources.filesystem import filesystem, read_csv

filesystem_pipe = filesystem(
  bucket_url="file://Users/admin/Documents/csv_files",
  file_glob="*.csv"
) | read_csv()

Available readers#

  • read_csv() - processes CSV files using Pandas. Control batch size with chunksize (defaults to 10000 rows). Accepts additional **pandas_kwargs passed to pd.read_csv().
  • read_jsonl() - processes JSONL files chunk by chunk. Control batch size with chunksize (defaults to 1000 lines per batch).
  • read_parquet() - processes Parquet files using PyArrow. Control memory usage with chunksize (defaults to 1000 rows per batch). Set use_pyarrow=True to yield native pyarrow.RecordBatch objects instead of Python dictionaries for zero-copy operations.
  • read_csv_duckdb() - processes CSV files using DuckDB, which usually shows better performance than Pandas. Control batch size with chunk_size (defaults to 5000 rows). Set use_pyarrow=True to yield Arrow format instead of JSON. Accepts additional **duckdb_kwargs passed to DuckDB's read_csv().

3. Create and run a pipeline#

import dlt
from dlt.sources.filesystem import filesystem, read_csv

filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
info = pipeline.run(filesystem_pipe)
print(info)

For more information on how to create and run the pipeline, read the Walkthrough: Run a pipeline.

4. Apply hints#

import dlt
from dlt.sources.filesystem import filesystem, read_csv

filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
# Tell dlt to merge on date
filesystem_pipe.apply_hints(write_disposition="merge", merge_key="date")

# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("table_name"))
print(load_info)

5. Incremental loading#

Here are a few simple ways to load your data incrementally:

  1. Load files based on modification date. Only load files that have been updated since the last time dlt processed them. dlt checks the files' metadata (like the modification date) and skips those that haven't changed.
  2. Load new records based on a specific column. You can load only the new or updated records by looking at a specific column, like updated_at. Unlike the first method, this approach would read all files every time and then filter the records which were updated.
  3. Combine loading only updated files and records. Finally, you can combine both methods. It could be useful if new records could be added to existing files, so you not only want to filter the modified files, but also the modified records.

Load files based on modification date#

For example, to load only new CSV files with incremental loading, you can use the apply_hints method.

import dlt
from dlt.sources.filesystem import filesystem, read_csv

# This configuration will only consider new CSV files
new_files = filesystem(
  bucket_url="s3://bucket_name",
  file_glob="directory/*.csv",
  incremental=dlt.sources.incremental("modification_date")
)

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run((new_files | read_csv()).with_name("csv_files"))
print(load_info)

Load new records based on a specific column#

In this example, we load only new records based on the field called updated_at. This method may be useful if you are not able to
filter files by modification date because, for example, all files are modified each time a new record appears.

import dlt
from dlt.sources.filesystem import filesystem, read_csv

# We consider all CSV files
all_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")

# But filter out only updated records
filesystem_pipe = (all_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)

Combine loading only updated files and records#

import dlt
from dlt.sources.filesystem import filesystem, read_csv

# This configuration will only consider modified CSV files
new_files = filesystem(
  bucket_url="s3://bucket_name",
  file_glob="directory/*.csv", incremental=dlt.sources.incremental("modification_date")
)

# And in each modified file, we filter out only updated records
filesystem_pipe = (new_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)

6. Split large incremental loads#

If you have many files to process or they are large you may choose to split pipeline runs into smaller chunks (where single file is the smallest). There are
two methods to do that:

  • Partitioning where you split source data in several ranges, load them (possibly in parallel) and then continue to load data incrementally.
  • Split where you load data sequentially in small chunks

Partitioning works as follows:

  1. Obtain a list of files i.e. by just listing your resource files = list(filesystem(...))
  2. Order your list by modification_date or file_url and split it into equal chunks.
  3. For each chunk find min and max of the range
  4. Use incremental with end_value for backfill.
  5. You can load each partition in a loop or in parallel (i.e. in separate process).
  6. Continue regular incremental loading with initial_value set to the value at the end of the range (modification_date or file_url)
    and make the start range open to avoid duplicates.
import dlt
from dlt.sources.filesystem import filesystem

# list and sort all csv files for deterministic partitioning
fs_ = filesystem(bucket_url=bucket_url, file_glob="**/*.csv")
# we assume that file paths are named so files added later in time come at the end when sorted
file_urls = sorted([file["file_url"] for file in fs_])

pipeline = dlt.pipeline("test_partitioned_load", destination="duckdb")

# load each partition using initial_value and end_value
for i in range(len(file_urls) // 4 + 1):
    files_range = file_urls[i * 4 : (i + 1) * 4]
    if not files_range:
        continue

    # close both ranges to load inclusively
    file_name_incremental = dlt.sources.incremental(
        "file_url",
        initial_value=files_range[0],
        end_value=files_range[-1],
        range_start="closed",
        range_end="closed",
    )
    file_resource = filesystem(
        bucket_url=bucket_url, file_glob="**/*.csv", incremental=file_name_incremental
    ).with_name("files")
    load_info = pipeline.run(file_resource)
    print(load_info)

# note we could also extract max modification_time and use it for subsequent incremental loading
file_name_incremental = dlt.sources.incremental(
    "file_url",
    initial_value=file_urls[-1],
    range_start="open",
)
file_resource = filesystem(
    bucket_url=bucket_url, file_glob="**/*.csv", incremental=file_name_incremental
).with_name("files")
# will write initial incremental state
pipeline.run(file_resource)

Please read notes on parallelism

Split loading works as follows:

  1. Use incremental property with row_order set.
  2. Limit number of files returned per page when creating filesystem instance to get manageable chunks
  3. Limit the resource by number of pages or time
  4. Run pipeline in a loop as long as it is not empty
import dlt
from dlt.sources.filesystem import filesystem

# return files in order of modification_date
incremental_ = dlt.sources.incremental("modification_date", row_order="asc") # type: ignore
# each page contains only one file
fs_ = filesystem(bucket_url=bucket_url, file_glob="csv/*", incremental=incremental_, files_per_page=1)

# process one file in each run, you could also use max_time to process files i.e. for an hour
while not pipeline.run(fs_.with_name("files").add_limit(1)).is_empty:
    print(pipeline.last_trace.last_load_info)

Note that you must set row_order on incremental to not miss a file:

7. Filter files#

If you need to filter out files based on their metadata, you can easily do this using the add_filter method.
Within your filtering function, you'll have access to any field of the FileItem representation.

Filter by name#

To filter only files that have London and Berlin in their names, you can do the following:

import dlt
from dlt.sources.filesystem import filesystem, read_csv

# Filter files accessing file_name field
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
filtered_files.add_filter(lambda item: ("London" in item["file_name"]) or ("Berlin" in item["file_name"]))

filesystem_pipe = (filtered_files | read_csv())
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)

Filter by size#

If for some reason you only want to load small files, you can also do that:

import dlt
from dlt.sources.filesystem import filesystem, read_csv

MAX_SIZE_IN_BYTES = 10

# Filter files accessing size_in_bytes field
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
filtered_files.add_filter(lambda item: item["size_in_bytes"] < MAX_SIZE_IN_BYTES)

filesystem_pipe = (filtered_files | read_csv())
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)

Standalone filesystem resource#

You can use the standalone filesystem resource to list files in cloud storage or a local filesystem. This allows you to customize file readers or manage files using fsspec.

from dlt.sources.filesystem import filesystem

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
files = filesystem(bucket_url="s3://my_bucket/data", file_glob="csv_folder/*.csv")
pipeline.run(files)

The filesystem ensures consistent file representation across bucket types and offers methods to access and read data. You can quickly build pipelines to:

FileItem representation#

  • All dlt sources/resources that yield files follow the FileItem contract.
  • File content is typically not loaded (you can control it with the extract_content parameter of the filesystem resource). Instead, full file info and methods to access content are available.
  • Users can request an authenticated fsspec AbstractFileSystem instance.

FileItem fields#

  • file_url - complete URL of the file (e.g., s3://bucket-name/path/file). This field serves as a primary key.
  • file_name - name of the file from the bucket URL.
  • relative_path - set when doing glob, is a relative path to a bucket_url argument.
  • mime_type - file's MIME type. It is sourced from the bucket provider or inferred from its extension.
  • modification_date - file's last modification time (format: pendulum.DateTime).
  • size_in_bytes - file size.
  • file_content - content, provided upon request.

File manipulation#

FileItem, backed by a dictionary implementation, offers these helpers:

  • read_bytes() - method, which returns the file content as bytes.
  • open() - method which provides a file object when opened.
  • filesystem - field, which gives access to authorized AbstractFilesystem with standard fsspec methods.

Create your own readers#

Although the filesystem resource yields the files from cloud storage or a local filesystem, you need to apply a transformer resource to retrieve the records from files. dlt natively supports three file types: CSV, Parquet, and JSONL (more details in filesystem transformer resource).

But you can easily create your own. In order to do this, you just need a function that takes as input a FileItemDict iterator and yields a list of records (recommended for performance) or individual records.

Example: read data from Excel files#

The code below sets up a pipeline that reads from an Excel file using a standalone transformer:

from typing import Iterator

import dlt
from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.common.typing import TDataItem
from dlt.sources.filesystem import filesystem

BUCKET_URL = "s3://my_bucket/data"

# Define a standalone transformer to read data from an Excel file.
@dlt.transformer
def read_excel(
    items: Iterator[FileItemDict], sheet_name: str
) -> Iterator[TDataItems]:
    # Import the required pandas library.
    import pandas as pd

    # Iterate through each file item.
    for file_obj in items:
        # Open the file object.
        with file_obj.open() as file:
            # Read from the Excel file and yield its content as dictionary records.
            yield pd.read_excel(file, sheet_name).to_dict(orient="records")

# Set up the pipeline to fetch a specific Excel file from a filesystem (bucket).
example_xls = filesystem(
    bucket_url=BUCKET_URL, file_glob="../directory/example.xlsx"
) | read_excel("example_table") # Pass the data through the transformer to read the "example_table" sheet.

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb", dataset_name="example_xls_data")
# Execute the pipeline and load the extracted data into the "duckdb" destination.
load_info = pipeline.run(example_xls.with_name("example_xls_data"))
# Print the loading information.
print(load_info)

Example: read data from XML files#

You can use any third-party library to parse an xml file (e.g., BeautifulSoup, pandas). In the following example, we will be using the xmltodict Python library.

import dlt
from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.common.typing import TDataItems
from dlt.sources.filesystem import filesystem

BUCKET_URL = "s3://my_bucket/data"

# Define a standalone transformer to read data from an XML file.
@dlt.transformer
def read_xml(items: Iterator[FileItemDict]) -> Iterator[TDataItems]:
    # Import the required xmltodict library.
    import xmltodict

    # Iterate through each file item.
    for file_obj in items:
        # Open the file object.
        with file_obj.open() as file:
            # Parse the file to dict records.
            yield xmltodict.parse(file.read())

# Set up the pipeline to fetch a specific XML file from a filesystem (bucket).
example_xml = filesystem(
    bucket_url=BUCKET_URL, file_glob="../directory/example.xml"
) | read_xml() # Pass the data through the transformer

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb", dataset_name="example_xml_data")
# Execute the pipeline and load the extracted data into the "duckdb" destination.
load_info = pipeline.run(example_xml.with_name("example_xml_data"))

# Print the loading information.
print(load_info)

Clean files after loading#

You can get an fsspec client from the filesystem resource after it was extracted, i.e., in order to delete processed files, etc. The filesystem module contains a convenient method fsspec_from_resource that can be used as follows:

from dlt.sources.filesystem import filesystem, read_csv
from dlt.sources.filesystem.helpers import fsspec_from_resource

# Get filesystem source.
gs_resource = filesystem("gs://ci-test-bucket/")
# Extract files.
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
pipeline.run(gs_resource | read_csv())
# Get fs client.
fs_client = fsspec_from_resource(gs_resource)
# Do any operation.
fs_client.ls("ci-test-bucket/standard_source/samples")

Copy files locally#

To copy files locally, add a step in the filesystem resource and then load the listing to the database:

import os

import dlt
from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.sources.filesystem import filesystem

def _copy(item: FileItemDict) -> FileItemDict:
    # Instantiate fsspec and copy file
    dest_file = os.path.join("./local_folder", item["file_name"])
    # Create destination folder
    os.makedirs(os.path.dirname(dest_file), exist_ok=True)
    # Download file
    item.fsspec.download(item["file_url"], dest_file)
    # Return file item unchanged
    return item

BUCKET_URL = "gs://ci-test-bucket/"

# Use recursive glob pattern and add file copy step
downloader = filesystem(BUCKET_URL, file_glob="**").add_map(_copy)

# NOTE: You do not need to load any data to execute extract; below, we obtain
# a list of files in a bucket and also copy them locally
listing = list(downloader)
print(listing)
# Download to table "listing"
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(
    downloader.with_name("listing"), write_disposition="replace"
)
# Pretty print the information on data that was loaded
print(load_info)
print(listing)
print(pipeline.last_trace.last_normalize_info)

Troubleshoot#

Access extremely long file paths#

Windows supports paths up to 255 characters. When you access a path longer than 255 characters, you'll see a FileNotFound exception.

To go over this limit, you can use extended paths.
Note that Python glob does not work with extended UNC paths, so you will not be able to use them

[sources.filesystem]
bucket_url = '\\?\C:\a\b\c'

If you get an empty list of files#

If you are running a dlt pipeline with the filesystem source and get zero records, we recommend you check
the configuration of bucket_url and file_glob parameters.

For example, with Azure Blob Storage, people sometimes mistake the account name for the container name. Make sure you've set up a URL as "az://<container name>/".

Also, please reference the glob function to configure the resource correctly. Use ** to include recursive files. Note that the local filesystem supports full Python glob functionality, while cloud storage supports a restricted fsspec version.

index | Dosu