Centralizing API Logic
When you fetch viral videos from 7 European regions, you end up calling the YouTube Data API from multiple scripts: the main fetcher, the virality scorer, the metadata enricher. At ViralVidVault, we built a Python client library that encapsulates retry logic, quota tracking, and region-aware caching.
The Client Core
import time
import json
import hashlib
import requests
from dataclasses import dataclass
from typing import Optional
@dataclass
class ClientConfig:
api_keys: list[str] # Multiple keys for rotation
base_url: str = "https://www.googleapis.com/youtube/v3"
max_retries: int = 3
timeout: int = 15
quota_per_key: int = 10000
cache_ttl: int = 1800 # 30 min
class QuotaExhaustedError(Exception):
pass
class YouTubeSDK:
"""Multi-key YouTube client with automatic key rotation and caching."""
def __init__(self, config: ClientConfig):
self.config = config
self.session = requests.Session()
self._key_index = 0
self._quota_used: dict[str, int] = {k: 0 for k in config.api_keys}
self._cache: dict[str, tuple[float, dict]] = {}
@property
def _current_key(self) -> str:
return self.config.api_keys[self._key_index]
def _rotate_key(self) -> bool:
"""Switch to the next available API key. Returns False if all exhausted."""
original = self._key_index
while True:
self._key_index = (self._key_index + 1) % len(self.config.api_keys)
if self._key_index == original:
return False # All keys exhausted
if self._quota_used[self._current_key] < self.config.quota_per_key:
return True
def _cache_get(self, key: str) -> Optional[dict]:
if key in self._cache:
ts, data = self._cache[key]
if time.time() - ts < self.config.cache_ttl:
return data
del self._cache[key]
return None
def _request(self, endpoint: str, params: dict, cost: int = 1) -> dict:
cache_key = hashlib.md5(f"{endpoint}:{json.dumps(params, sort_keys=True)}".encode()).hexdigest()
cached = self._cache_get(cache_key)
if cached:
return cached
last_error = None
for attempt in range(self.config.max_retries):
if self._quota_used[self._current_key] + cost > self.config.quota_per_key:
if not self._rotate_key():
raise QuotaExhaustedError("All API keys exhausted")
params["key"] = self._current_key
try:
resp = self.session.get(
f"{self.config.base_url}/{endpoint}",
params=params,
timeout=self.config.timeout,
)
if resp.status_code == 200:
data = resp.json()
self._quota_used[self._current_key] += cost
self._cache[cache_key] = (time.time(), data)
return data
if resp.status_code == 403:
# Quota exceeded for this key, rotate
self._quota_used[self._current_key] = self.config.quota_per_key
if not self._rotate_key():
raise QuotaExhaustedError("All keys quota exceeded")
continue
if resp.status_code in (429, 500, 503):
time.sleep(2 ** attempt)
continue
resp.raise_for_status()
except requests.ConnectionError as e:
last_error = e
time.sleep(2 ** attempt)
raise last_error or RuntimeError(f"Failed after {self.config.max_retries} attempts")
European Region Methods
Build domain-specific methods for multi-region fetching:
EU_REGIONS = ["PL", "NL", "SE", "NO", "AT", "GB", "US"]
def get_trending(self, region: str = "GB", max_results: int = 25) -> list[dict]:
params = {
"part": "snippet,statistics,contentDetails",
"chart": "mostPopular",
"regionCode": region,
"maxResults": min(max_results, 50),
}
return self._request("videos", params).get("items", [])
def get_trending_all_regions(self, max_per_region: int = 25) -> dict[str, list]:
"""Fetch trending from all European regions."""
results = {}
for region in self.EU_REGIONS:
try:
results[region] = self.get_trending(region, max_per_region)
except QuotaExhaustedError:
print(f"Quota exhausted at region {region}")
break
return results
def get_video_batch(self, video_ids: list[str]) -> list[dict]:
"""Fetch details for up to 50 videos per call."""
all_items = []
for i in range(0, len(video_ids), 50):
batch = video_ids[i:i + 50]
params = {
"part": "snippet,statistics,contentDetails",
"id": ",".join(batch),
}
data = self._request("videos", params)
all_items.extend(data.get("items", []))
return all_items
@property
def total_quota_remaining(self) -> int:
return sum(
self.config.quota_per_key - used
for used in self._quota_used.values()
)
def quota_report(self) -> dict:
return {
key[:8] + "...": {
"used": self._quota_used[key],
"remaining": self.config.quota_per_key - self._quota_used[key],
}
for key in self.config.api_keys
}
Usage
config = ClientConfig(
api_keys=["AIzaSy_key1", "AIzaSy_key2", "AIzaSy_key3"],
)
sdk = YouTubeSDK(config)
# Fetch all European regions
all_trending = sdk.get_trending_all_regions(max_per_region=25)
for region, videos in all_trending.items():
print(f"[{region}] {len(videos)} videos")
# Check quota health
print(sdk.quota_report())
# {'AIzaSy_k...': {'used': 14, 'remaining': 9986}, ...}
Key Design Decisions
- Multi-key rotation: When one key hits its quota, automatically switch to the next
- Region-aware caching: Same region + same params = cache hit, avoiding duplicate calls
- Cost-aware requests: Search costs 100 units vs 1 for videos.list
- Graceful degradation: Partial results are better than a crash when quota runs out
This SDK powers every YouTube API interaction at ViralVidVault. The key rotation alone saved us from quota exhaustion during peak viral detection periods.
This article is part of the Building ViralVidVault series. Check out ViralVidVault to see these techniques in action.
Top comments (0)