Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt

Use this file to discover all available pages before exploring further.

In this quickstart, you’ll build a complete data pipeline that extracts data from the Chess.com API and loads it into a local DuckDB database. By the end, you’ll understand the core concepts and be ready to build your own pipelines.
This tutorial takes approximately 5-10 minutes to complete. You’ll create a working pipeline, run it, and query the results.

What You’ll Build

You’ll create a pipeline that:
  • Fetches player data from the Chess.com public API
  • Automatically infers the schema from the JSON response
  • Loads the data into DuckDB (a fast, embedded SQL database)
  • Can be queried immediately using SQL

Prerequisites

  • Python 3.9 or higher installed on your system
  • Basic familiarity with Python
  • A terminal or command prompt
1

Install dlt

First, install dlt using pip. We’ll also install DuckDB support, which is our destination database.
pip install dlt[duckdb]
The [duckdb] extra installs the DuckDB adapter. dlt supports 20+ destinations including PostgreSQL, BigQuery, Snowflake, and more. Check the destinations documentation for the full list.
2

Create Your Pipeline Script

Create a new Python file called chess_pipeline.py and add the following code:
chess_pipeline.py
import dlt
from dlt.sources.helpers import requests

# Create a dlt pipeline that will load
# chess player data to the DuckDB destination
pipeline = dlt.pipeline(
    pipeline_name='chess_pipeline',
    destination='duckdb',
    dataset_name='player_data'
)

# Grab some player data from Chess.com API
data = []
for player in ['magnuscarlsen', 'rpragchess']:
    response = requests.get(f'https://api.chess.com/pub/player/{player}')
    response.raise_for_status()
    data.append(response.json())

# Extract, normalize, and load the data
info = pipeline.run(data, table_name='player')

print(info)
Let’s break down what this code does:
  • Create a pipeline: The dlt.pipeline() function creates a pipeline with a name, destination (DuckDB), and dataset name
  • Fetch data: We loop through player usernames and fetch their data from the Chess.com API
  • Load data: The pipeline.run() method automatically infers the schema, normalizes the JSON data, and loads it into the player table
  • Print info: The run info contains metadata about what was loaded
3

Run Your Pipeline

Execute your pipeline script:
python chess_pipeline.py
You should see output indicating the pipeline ran successfully, including information about the loaded data:
Pipeline chess_pipeline completed in 0.5s
1 load package(s) were loaded to destination duckdb and into dataset player_data
The duckdb destination used duckdb:////home/user/chess_pipeline.duckdb location to store data
Load package 1234567890.0 is LOADED and contains no failed jobs
Success! Your pipeline has loaded data into DuckDB. The database file chess_pipeline.duckdb was created in your current directory.
4

Query Your Data

Now let’s query the data you just loaded. Create a new file called query_data.py:
query_data.py
import dlt

# Connect to the same pipeline
pipeline = dlt.pipeline(
    pipeline_name='chess_pipeline',
    destination='duckdb',
    dataset_name='player_data'
)

# Query the data using SQL
with pipeline.sql_client() as client:
    # Execute a query
    with client.execute_query('SELECT * FROM player') as cursor:
        # Fetch all rows
        rows = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        
        # Print results
        print(f"Found {len(rows)} players:")
        print("\nColumns:", columns)
        for row in rows:
            print(dict(zip(columns, row)))
Run the query script:
python query_data.py
You’ll see the player data you loaded, including usernames, titles, followers, and more.
You can also use pandas to work with your data:
import dlt

pipeline = dlt.pipeline(
    pipeline_name='chess_pipeline',
    destination='duckdb',
    dataset_name='player_data'
)

# Get data as a pandas DataFrame
df = pipeline.dataset().player.df()
print(df)
5

Understand What Happened

When you ran your pipeline, dlt automatically:
  1. Inferred the schema - Examined the JSON structure and determined table columns and data types
  2. Created tables - Set up the player table in DuckDB with the appropriate schema
  3. Normalized data - Converted the nested JSON into relational tables (if there were nested structures, dlt would create child tables)
  4. Loaded data - Inserted the player records into the database
  5. Tracked state - Saved pipeline metadata for incremental loading in future runs
All of this happened with just one function call: pipeline.run()!

Next Steps

Congratulations! You’ve built your first dlt pipeline. Here’s what you can explore next:

Core Concepts

Learn about pipelines, sources, resources, and destinations in depth

Incremental Loading

Load only new or changed data instead of full refreshes

Destinations

Explore 20+ supported destinations like BigQuery, Snowflake, and PostgreSQL

Verified Sources

Use pre-built sources for popular APIs and services

Common Patterns

Loading Different Data Types

dlt can load various data types beyond API responses:
import dlt

pipeline = dlt.pipeline(
    destination='duckdb',
    dataset_name='mydata'
)

# Load a list of dictionaries
data = [
    {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
    {'id': 2, 'name': 'Bob', 'email': 'bob@example.com'}
]
pipeline.run(data, table_name='users')

# Load a pandas DataFrame
import pandas as pd
df = pd.DataFrame({'id': [1, 2], 'value': [100, 200]})
pipeline.run(df, table_name='metrics')

# Load from a generator function
def generate_data():
    for i in range(1000):
        yield {'id': i, 'value': i * 10}

pipeline.run(generate_data(), table_name='generated')

Using Different Destinations

Switch destinations by changing the destination parameter:
# Load to PostgreSQL
pipeline = dlt.pipeline(
    destination='postgres',
    dataset_name='mydata',
    credentials='postgresql://user:password@localhost:5432/mydb'
)

# Load to BigQuery
pipeline = dlt.pipeline(
    destination='bigquery',
    dataset_name='mydata',
    credentials='path/to/credentials.json'
)

# Load to Snowflake
pipeline = dlt.pipeline(
    destination='snowflake',
    dataset_name='mydata'
)

Incremental Loading

Load only new data on subsequent runs:
import dlt

@dlt.resource(
    table_name='events',
    write_disposition='append',
    primary_key='event_id'
)
def get_events(updated_at=dlt.sources.incremental('updated_at')):
    # This will automatically track the last 'updated_at' value
    response = requests.get(
        'https://api.example.com/events',
        params={'since': updated_at.last_value}
    )
    yield response.json()

pipeline = dlt.pipeline(
    destination='duckdb',
    dataset_name='events_data'
)

# First run loads all data
# Subsequent runs only load new events
pipeline.run(get_events())

Troubleshooting

Make sure you’ve installed dlt in your current Python environment:
pip install dlt[duckdb]
If using a virtual environment, ensure it’s activated.
The Chess.com API is public and doesn’t require authentication. If you get connection errors:
If you get a “database is locked” error, make sure:
  • You’ve closed any previous connections to the database
  • No other process is accessing the .duckdb file
  • You’re using with pipeline.sql_client() as client: to ensure connections are properly closed
If dlt infers the wrong data types:
  • Provide explicit hints using the columns parameter
  • Define a custom schema
  • See the schema documentation for details

Learn More

Join Slack Community

Get help from thousands of dlt users and the core team

Browse Examples

Explore real-world pipeline examples and use cases

Full Documentation

Deep dive into all dlt features and capabilities

GitHub Repository

View source code, report issues, and contribute