Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

When fetching large amounts of data, the EarthScope SDK uses query plans to efficiently break down your request into multiple API calls. This guide explains how to use query plans for memory-efficient data processing.

Understanding Query Plans

A query plan transparently handles the complexity of:

from earthscope_sdk import EarthScopeClient
import datetime as dt

client = EarthScopeClient()

# Create a query plan (no data fetched yet)
plan = client.data.gnss_observations(
    start_datetime=dt.datetime(2025, 1, 1),
    end_datetime=dt.datetime(2025, 12, 31),
    station_name="AC60",
    session_name="A",
)

# View the execution plan
plan.plan()
print(plan)  # Shows: GnssObservationsQueryPlan(requests=182, groups=182)

Working with Results

Query plans return Apache Arrow tables, which you can easily convert to Polars, Pandas, DuckDB, etc.

import polars as pl

# Fetch all data
table = plan.fetch()
df = pl.from_arrow(table)  # Zero-copy conversion!

# Or iterate through batches
for daily_table in plan.group_by_day():
    df = pl.from_arrow(daily_table)
    # Process df...

Memory-Efficient Data Access

For large datasets, you can control memory usage by iterating through data in chunks instead of loading everything at once.

Option 1: Load All Data at Once (Small Queries)

# Fast but memory-intensive - good for < 1 week of data
table = plan.fetch()  # Attempts to load entire result set into memory at once

When to use: Small queries (< 1 week of data), enough RAM available

Option 2: Iterate by Day (Medium Queries)

# Memory-efficient: process one day at a time
# Good for weeks to months of data when fetching data for multiple stations at once
for daily_table in plan.group_by_day():
    # Process this day's data
    process(daily_table)
    # Previous day's data can be garbage collected

When to use: Medium queries (weeks-months), limited RAM

Example: Calculate daily statistics without loading all data:

import polars as pl

daily_stats = []
for daily_table in plan.group_by_day():
    df = pl.from_arrow(daily_table)

    stats = {
        'date': df['timestamp'][0].date(),
        'observations': len(df),
        'unique_satellites': df['satellite'].n_unique(),
    }
    daily_stats.append(stats)

# View summary
pl.DataFrame(daily_stats)

Option 3: Iterate by Station (Multi-Station Queries)

Depending on the product, grouping by station may make sense. When grouping by station, each table returned by the iterator contains the entire time range for 1 station.

# Useful when querying multiple stations
plan = client.data.gnss_observations(
    start_datetime=dt.datetime(2025, 1, 1),
    end_datetime=dt.datetime(2025, 1, 31),
    station_name=["AC60", "AB01", "AB02"],  # Multiple stations
    session_name="A",
)

# Process one station at a time
for station_table in plan.group_by_station():
    station_name = station_table["igs"][0]  # Get station name
    print(f"Processing {station_name}")
    process(station_table)

When to use: Multiple stations, station-specific processing

Option 4: Custom Grouping (Advanced)

You can also use plan.group_by() and plan.sort_by() to define query plan behavior for your own specific use case.

# Group by custom logic (e.g., by month)
plan.group_by(lambda req: req.period.start.strftime("%Y-%m"))

for batch in plan:
    # Process custom batches
    process(batch)

When to use: Custom workflows, specific processing requirements

Controlling Request Order

You can sort requests to control the order in which data is fetched and processed:

# Fetch data sorted by station name
plan.sort_by_station().group_by_day()

Note: sort_by() and group_by() are commutative - these produce the same result (data sorted within groups):

plan.sort_by_station().group_by_day()  # Same result as below
plan.group_by_day().sort_by_station()  # Same result as above

Within each group, requests will be sorted by the specified key.

Async Iteration

For maximum efficiency in async code, iterate asynchronously:

from earthscope_sdk import AsyncEarthScopeClient

async with AsyncEarthScopeClient() as client:
    plan = client.data.gnss_observations(
        start_datetime=dt.datetime(2025, 1, 1),
        end_datetime=dt.datetime(2025, 12, 31),
        station_name="AC60",
        session_name="A",
    ).group_by_day()

    async for table in plan:
        await process_async(table)

Safety Limits: Memory & Timeout

To prevent memory issues or runaway queries, you can configure limits on query plan execution:

Memory Limits

Prevent queries from consuming too much memory:

from earthscope_sdk.client.data_access.error import MemoryLimitExceededError

plan = client.data.gnss_observations(...)

try:
    # Limit to 500 MB
    table = plan.with_memory_limit(500_000_000).fetch()
except MemoryLimitExceededError as e:
    print(f"Hit limit after {e.successful_request_count}/{e.total_request_count} requests")
    # Use partial data
    if e.partial_table is not None:
        process_partial(e.partial_table)

Timeouts

Prevent queries from running too long:

from earthscope_sdk.client.data_access.error import TimeoutExceededError

plan = client.data.gnss_observations(...)

try:
    # Timeout after 30 seconds
    table = plan.with_timeout(30).fetch()
except TimeoutExceededError as e:
    print(f"Timeout after {e.timeout_seconds}s")
    print(f"Got {e.successful_request_count}/{e.total_request_count} requests")
    # Use partial data
    if e.partial_table is not None:
        process_partial(e.partial_table)

Combining Limits

You can use both limits together:

plan = client.data.gnss_observations(...)

# Limit both memory and time
table = (
    plan.with_timeout(30)  # 30 seconds
    .with_memory_limit(500_000_000)  # 500 MB
    .fetch()
)

# Whichever limit is hit first will raise

Setting Defaults

Set global defaults in your configuration to protect all queries:

from earthscope_sdk import EarthScopeClient
from earthscope_sdk.config.settings import SdkSettings

settings = SdkSettings(
    query_plan={
        "memory_limit_bytes": 500_000_000,  # 500 MB default
        "timeout_seconds": 30,  # 30 second timeout
    }
)

client = EarthScopeClient(settings=settings)

# All queries use these defaults
table = client.data.gnss_observations(...).fetch()

# Override per-query when needed
table = client.data.gnss_observations(...).with_memory_limit(1_000_000_000).fetch()

See Settings for more configuration options.

Performance Characteristics

The SDK automatically optimizes performance:

  1. Concurrent fetching: Requests within each batch are fetched concurrently

  2. Rate limiting: Automatically throttles to respect API limits (default: 100 concurrent, 150/sec)

  3. Retries: Failed requests are automatically retried with exponential backoff

  4. Connection pooling: HTTP connections are reused for efficiency

Choosing the Right Strategy

# Small query (< 1 week): Load everything
table = client.data.gnss_observations(...).fetch()

# Medium query (weeks-months): Group by day
for daily_batch in client.data.gnss_observations(...).group_by_day():
    process(daily_batch)

# Large query (months-years): Custom grouping
plan = client.data.gnss_observations(...)
plan.group_by(lambda req: req.period.start.strftime("%Y-%m"))
for monthly_batch in plan:
    process(monthly_batch)

# Multiple stations: Group by station
for station_batch in client.data.gnss_observations(...).group_by_station():
    process(station_batch)

Quick Reference

Common Patterns

# Pattern 1: Small dataset - load all at once
table = client.data.gnss_observations(...).fetch()
df = pl.from_arrow(table)

# Pattern 2: Medium dataset - process by day
for daily_table in client.data.gnss_observations(...).group_by_day():
    df = pl.from_arrow(daily_table)
    # Process df...

# Pattern 3: Large dataset - custom grouping
plan = client.data.gnss_observations(...)
plan.group_by(lambda req: req.period.start.month)
for monthly_table in plan:
    df = pl.from_arrow(monthly_table)
    # Process df...

# Pattern 4: Save to disk for later
import pyarrow.parquet as pq
for daily_table in plan.group_by_day():
    date = daily_table['timestamp'][0].as_py().date()
    pq.write_table(daily_table, f"data_{date}.parquet")

Next Steps