批量下载历史数据-自定义点位
以下载自定义经纬度点位2025年逐日气温、降雨数据为例
需求:下载一批自定义经纬度点位 2025 年逐日平均气温、降雨量数据。
历史数据模型:era5_seamless
支持的逐小时要素:era5历史数据
全部支持的逐日数据要素查看:逐日要素支持
全部支持的逐月数据要素查看:逐月要素支持
以 Python 代码为例,分四步完成:
点位格式说明
每个点位只有 lon(经度)和 lat(纬度)是必填项,id 和 name 均为可选。
| 字段 | 是否必填 | 说明 |
|---|---|---|
lon | 必填 | 经度 |
lat | 必填 | 纬度 |
id | 可选 | 自定义编号,用于结果关联,不填则自动生成 |
name | 可选 | 点位名称,仅用于标识 |
传入示例:
{
"type": "point",
"locations": [
{ "id": "1", "name": "北京", "lon": 116.4, "lat": 39.9 },
{ "lon": 121.5, "lat": 31.2 }
]
}
第一步:准备自定义点位列表
import time
import zipfile
import requests
API_KEY = "your_api_key"
BASE_URL = "https://api.mirror-earth.com"
# 自定义点位列表,支持格式:[id,name,lon,lat | id,lon,lat | lon,lat]
locations = [
{"id": "1", "name": "Point 1", "lon": 120.14, "lat": 32.55},
{"id": "2", "name": "Point 2", "lon": 22.14, "lat": 32.22},
]
print(f"共 {len(locations)} 个自定义点位")
第二步:创建批量下载任务
headers = {
"X-API-Key": API_KEY,
"Content-Type": "application/json",
}
payload = {
"domain": "era5_seamless",
"hourly": [],
"daily": ["temperature_2m_mean", "rain_sum"],
"monthly": [],
"time_range": {
"type": "date_range",
"start": "2025-01-01",
"end": "2025-12-31",
},
"area": {
"type": "point",
"locations": locations,
},
"timezone": "Asia/Shanghai",
"area_average": False,
}
res = requests.post(f"{BASE_URL}/api/tasks/export", json=payload, headers=headers)
res.raise_for_status()
task_id = res.json()["data"]["task_id"]
print(f"任务已创建,task_id: {task_id}")
第三步:轮询等待任务完成
output_file = None
while True:
res = requests.get(f"{BASE_URL}/api/tasks/{task_id}/status", headers=headers)
res.raise_for_status()
data = res.json()["data"]
status = data["status"]
progress = data.get("progress_percent", 0)
print(f"任务状态:{status},进度:{progress}%")
if status == "completed":
output_file = data["output_file"]
break
elif status in ("failed", "cancelled"):
raise RuntimeError(f"任务异常,状态:{status}")
time.sleep(10)
第四步:下载并解压任务结果
download_url = f"{BASE_URL}/user_downloads/{output_file}"
res = requests.get(download_url, headers=headers, stream=True)
res.raise_for_status()
zip_path = "custom_points_2025.zip"
with open(zip_path, "wb") as f:
for chunk in res.iter_content(chunk_size=8192):
f.write(chunk)
with zipfile.ZipFile(zip_path, "r") as z:
z.extractall("custom_points_2025")
print("下载完成,数据已解压到 custom_points_2025/")
完整代码
import time
import zipfile
import requests
API_KEY = "your_api_key"
BASE_URL = "https://api.mirror-earth.com"
headers = {
"X-API-Key": API_KEY,
"Content-Type": "application/json",
}
# ── 1. 准备自定义点位列表 ────────────────────────────────────────
# 支持格式:[id,name,lon,lat | id,lon,lat | lon,lat]
locations = [
{"id": "1", "name": "Point 1", "lon": 120.14, "lat": 32.55},
{"id": "2", "name": "Point 2", "lon": 22.14, "lat": 32.22},
]
print(f"共 {len(locations)} 个自定义点位")
# ── 2. 创建任务 ──────────────────────────────────────────────────
payload = {
"domain": "era5_seamless",
"hourly": [],
"daily": ["temperature_2m_mean", "rain_sum"],
"monthly": [],
"time_range": {
"type": "date_range",
"start": "2025-01-01",
"end": "2025-12-31",
},
"area": {
"type": "point",
"locations": locations,
},
"timezone": "Asia/Shanghai",
"area_average": False,
}
res = requests.post(f"{BASE_URL}/api/tasks/export", json=payload, headers=headers)
res.raise_for_status()
task_id = res.json()["data"]["task_id"]
print(f"任务已创建,task_id: {task_id}")
# ── 3. 轮询任务进度 ──────────────────────────────────────────────
output_file = None
while True:
res = requests.get(f"{BASE_URL}/api/tasks/{task_id}/status", headers=headers)
res.raise_for_status()
data = res.json()["data"]
status = data["status"]
progress = data.get("progress_percent", 0)
print(f"任务状态:{status},进度:{progress}%")
if status == "completed":
output_file = data["output_file"]
break
elif status in ("failed", "cancelled"):
raise RuntimeError(f"任务异常,状态:{status}")
time.sleep(10)
# ── 4. 下载并解压结果 ────────────────────────────────────────────
download_url = f"{BASE_URL}/user_downloads/{output_file}"
res = requests.get(download_url, headers=headers, stream=True)
res.raise_for_status()
zip_path = "custom_points_2025.zip"
with open(zip_path, "wb") as f:
for chunk in res.iter_content(chunk_size=8192):
f.write(chunk)
with zipfile.ZipFile(zip_path, "r") as z:
z.extractall("custom_points_2025")
print("下载完成,数据已解压到 custom_points_2025/")