On this tutorial, we info you via the design and efficiency of AsyncConfig, a recent, async-first configuration administration library for Python. We assemble it from the underside as a lot as help extremely efficient choices, along with type-safe dataclass-based configuration loading, a lot of configuration sources (equal to ambiance variables, recordsdata, and dictionaries), and scorching reloading using watchdog. With a transparent API and highly effective validation capabilities, AsyncConfig is sweet for every enchancment and manufacturing environments. All by this tutorial, we reveal its capabilities using simple, superior, and validation-focused use circumstances, all powered by asyncio to help non-blocking workflows.
import asyncio
import json
import os
import yaml
from pathlib import Path
from typing import Any, Dict, Non-compulsory, Variety, TypeVar, Union, get_type_hints
from dataclasses import dataclass, self-discipline, MISSING
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging
__version__ = "0.1.0"
__author__ = "AsyncConfig Workforce"
T = TypeVar('T')
logger = logging.getLogger(__name__)
We begin by importing necessary Python modules required for our configuration system. These embody asyncio for asynchronous operations, yaml and json for file parsing, dataclasses for structured configuration, and watchdog for heat reloading. We moreover define some metadata and prepare a logger to hint events all by the system.
class ConfigError(Exception):
"""Base exception for configuration errors."""
transfer
class ValidationError(ConfigError):
"""Raised when configuration validation fails."""
transfer
class LoadError(ConfigError):
"""Raised when configuration loading fails."""
transfer
@dataclass
class ConfigSource:
"""Represents a configuration provide with priority and reload capabilities."""
path: Non-compulsory[Path] = None
env_prefix: Non-compulsory[str] = None
information: Non-compulsory[Dict[str, Any]] = None
priority: int = 0
watch: bool = False
def __post_init__(self):
if self.path:
self.path = Path(self.path)
We define a hierarchy of custom-made exceptions to take care of completely totally different configuration-related errors, with ConfigError as the underside class and additional explicit ones, equal to ValidationError and LoadError, for centered troubleshooting. We moreover create a ConfigSource information class to characterize a single configuration provide, which could possibly be a file, ambiance variables, or a dictionary, and embody help for prioritization and non-obligatory scorching reloading.
class ConfigWatcher(FileSystemEventHandler):
"""File system event handler for configuration scorching reloading."""
def __init__(self, config_manager, paths: itemizing[Path]):
self.config_manager = config_manager
self.paths = {str(p.resolve()) for p in paths}
great().__init__()
def on_modified(self, event):
if not event.is_directory and event.src_path in self.paths:
logger.information(f"Configuration file modified: {event.src_path}")
asyncio.create_task(self.config_manager._reload_config())
We create the ConfigWatcher class by extending FileSystemEventHandler to permit scorching reloading of configuration recordsdata. This class shows specified file paths and triggers an asynchronous reload of the configuration by the associated supervisor every time a file is modified. This ensures our utility can adapt to configuration modifications in real-time without having a restart.
class AsyncConfigManager:
"""
Fashionable async configuration supervisor with kind safety and scorching reloading.
Choices:
- Async-first design
- Variety-safe configuration classes
- Setting variable help
- Scorching reloading
- Quite a few provide merging
- Validation with detailed error messages
"""
def __init__(self):
self.sources: itemizing[ConfigSource] = []
self.observers: itemizing[Observer] = []
self.config_cache: Dict[str, Any] = {}
self.reload_callbacks: itemizing[callable] = []
self._lock = asyncio.Lock()
def add_source(self, provide: ConfigSource) -> "AsyncConfigManager":
"""Add a configuration provide."""
self.sources.append(provide)
self.sources.sort(key=lambda x: x.priority, reverse=True)
return self
def add_file(self, path: Union[str, Path], priority: int = 0, watch: bool = False) -> "AsyncConfigManager":
"""Add a file-based configuration provide."""
return self.add_source(ConfigSource(path=path, priority=priority, watch=watch))
def add_env(self, prefix: str, priority: int = 100) -> "AsyncConfigManager":
"""Add ambiance variable provide."""
return self.add_source(ConfigSource(env_prefix=prefix, priority=priority))
def add_dict(self, information: Dict[str, Any], priority: int = 50) -> "AsyncConfigManager":
"""Add dictionary-based configuration provide."""
return self.add_source(ConfigSource(information=information, priority=priority))
async def load_config(self, config_class: Variety[T]) -> T:
"""Load and validate configuration proper right into a typed dataclass."""
async with self._lock:
config_data = await self._merge_sources()
attempt:
return self._validate_and_convert(config_data, config_class)
moreover Exception as e:
elevate ValidationError(f"Did not validate configuration: {e}")
async def _merge_sources(self) -> Dict[str, Any]:
"""Merge configuration from all sources based mostly totally on priority."""
merged = {}
for provide in reversed(self.sources):
attempt:
information = await self._load_source(provide)
if information:
merged.exchange(information)
moreover Exception as e:
logger.warning(f"Did not load provide {provide}: {e}")
return merged
async def _load_source(self, provide: ConfigSource) -> Non-compulsory[Dict[str, Any]]:
"""Load information from a single configuration provide."""
if provide.information:
return provide.information.copy()
if provide.path:
return await self._load_file(provide.path)
if provide.env_prefix:
return self._load_env_vars(provide.env_prefix)
return None
async def _load_file(self, path: Path) -> Dict[str, Any]:
"""Load configuration from a file."""
if not path.exists():
elevate LoadError(f"Configuration file not found: {path}")
attempt:
content material materials = await asyncio.to_thread(path.read_text)
if path.suffix.lower() == '.json':
return json.lots(content material materials)
elif path.suffix.lower() in ['.yml', '.yaml']:
return yaml.safe_load(content material materials) or {}
else:
elevate LoadError(f"Unsupported file format: {path.suffix}")
moreover Exception as e:
elevate LoadError(f"Did not load {path}: {e}")
def _load_env_vars(self, prefix: str) -> Dict[str, Any]:
"""Load ambiance variables with given prefix."""
env_vars = {}
prefix = prefix.increased() + '_'
for key, price in os.environ.objects():
if key.startswith(prefix):
config_key = key[len(prefix):].lower()
env_vars[config_key] = self._convert_env_value(price)
return env_vars
def _convert_env_value(self, price: str) -> Any:
"""Convert ambiance variable string to acceptable kind."""
if price.lower() in ('true', 'false'):
return price.lower() == 'true'
attempt:
if '.' in price:
return float(price)
return int(price)
moreover ValueError:
transfer
attempt:
return json.lots(price)
moreover json.JSONDecodeError:
transfer
return price
def _validate_and_convert(self, information: Dict[str, Any], config_class: Variety[T]) -> T:
"""Validate and convert information to the specified configuration class."""
if not hasattr(config_class, '__dataclass_fields__'):
elevate ValidationError(f"{config_class.__name__} needs to be a dataclass")
type_hints = get_type_hints(config_class)
field_values = {}
for field_name, field_info in config_class.__dataclass_fields__.objects():
if field_name in information:
field_value = information[field_name]
if hasattr(field_info.kind, '__dataclass_fields__'):
if isinstance(field_value, dict):
field_value = self._validate_and_convert(field_value, field_info.kind)
field_values[field_name] = field_value
elif field_info.default simply is not MISSING:
field_values[field_name] = field_info.default
elif field_info.default_factory simply is not MISSING:
field_values[field_name] = field_info.default_factory()
else:
elevate ValidationError(f"Required self-discipline '{field_name}' not current in configuration")
return config_class(**field_values)
async def start_watching(self):
"""Start watching configuration recordsdata for modifications."""
watch_paths = []
for provide in self.sources:
if provide.watch and provide.path:
watch_paths.append(provide.path)
if watch_paths:
observer = Observer()
watcher = ConfigWatcher(self, watch_paths)
for path in watch_paths:
observer.schedule(watcher, str(path.guardian), recursive=False)
observer.start()
self.observers.append(observer)
logger.information(f"Started watching {len(watch_paths)} configuration recordsdata")
async def stop_watching(self):
"""Stop watching configuration recordsdata."""
for observer in self.observers:
observer.stop()
observer.be part of()
self.observers.clear()
async def _reload_config(self):
"""Reload configuration from all sources."""
attempt:
self.config_cache.clear()
for callback in self.reload_callbacks:
await callback()
logger.information("Configuration reloaded effectively")
moreover Exception as e:
logger.error(f"Did not reload configuration: {e}")
def on_reload(self, callback: callable):
"""Register a callback to be often called when configuration is reloaded."""
self.reload_callbacks.append(callback)
async def __aenter__(self):
await self.start_watching()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop_watching()
We now implement the core of our system by the AsyncConfigManager class. It acts as a result of the central controller for all configuration operations, together with sources (recordsdata, ambiance variables, dictionaries), merging them by priority, loading recordsdata asynchronously, and validating in opposition to typed dataclasses. We make the design async-first, allowing non-blocking I/O, and embody a locking mechanism to verify protected concurrent entry. Moreover, we permit scorching reloading by watching specified config recordsdata and triggering callbacks every time a change is detected. This setup provides a flexible, robust, and classy foundation for dynamically managing utility configurations.
async def load_config(config_class: Variety[T],
config_file: Non-compulsory[Union[str, Path]] = None,
env_prefix: Non-compulsory[str] = None,
watch: bool = False) -> T:
"""
Consolation function to shortly load configuration.
Args:
config_class: Dataclass to load configuration into
config_file: Non-compulsory configuration file path
env_prefix: Non-compulsory ambiance variable prefix
watch: Whether or not or to not observe for file modifications
Returns:
Configured event of config_class
"""
supervisor = AsyncConfigManager()
if config_file:
supervisor.add_file(config_file, priority=0, watch=watch)
if env_prefix:
supervisor.add_env(env_prefix, priority=100)
return await supervisor.load_config(config_class)
We add a useful helper function, load_config, to streamline the configuration setup course of. With just one title, we are going to load settings from a file, ambiance variables, or every proper right into a typed dataclass, optionally enabling scorching reloading. This utility makes the library beginner-friendly whereas nonetheless supporting superior use circumstances beneath the hood.
@dataclass
class DatabaseConfig:
"""Occasion database configuration."""
host: str = "localhost"
port: int = 5432
username: str = "admin"
password: str = ""
database: str = "myapp"
ssl_enabled: bool = False
pool_size: int = 10
@dataclass
class AppConfig:
"""Occasion utility configuration."""
debug: bool = False
log_level: str = "INFO"
secret_key: str = ""
database: DatabaseConfig = self-discipline(default_factory=DatabaseConfig)
redis_url: str = "redis://localhost:6379"
max_workers: int = 4
async def demo_simple_config():
"""Demo simple configuration loading."""
sample_config = {
"debug": True,
"log_level": "DEBUG",
"secret_key": "dev-secret-key",
"database": {
"host": "localhost",
"port": 5432,
"username": "testuser",
"password": "testpass",
"database": "testdb"
},
"max_workers": 8
}
supervisor = AsyncConfigManager()
supervisor.add_dict(sample_config, priority=0)
config = await supervisor.load_config(AppConfig)
print("=== Straightforward Configuration Demo ===")
print(f"Debug mode: {config.debug}")
print(f"Log stage: {config.log_level}")
print(f"Database host: {config.database.host}")
print(f"Database port: {config.database.port}")
print(f"Max workers: {config.max_workers}")
return config
We define two occasion configuration dataclasses: DatabaseConfig and AppConfig, which showcase how nested and typed configurations are structured. To disclose precise utilization, we write demo_simple_config(), the place we load a major dictionary into our config supervisor. This illustrates how effortlessly we are going to map structured information into type-safe Python objects, making configuration coping with clear, readable, and maintainable.
async def demo_advanced_config():
"""Demo superior configuration with a lot of sources."""
base_config = {
"debug": False,
"log_level": "INFO",
"secret_key": "production-secret",
"max_workers": 4
}
override_config = {
"debug": True,
"log_level": "DEBUG",
"database": {
"host": "dev-db.occasion.com",
"port": 5433
}
}
env_config = {
"secret_key": "env-secret-key",
"redis_url": "redis://prod-redis:6379"
}
print("n=== Superior Configuration Demo ===")
supervisor = AsyncConfigManager()
supervisor.add_dict(base_config, priority=0)
supervisor.add_dict(override_config, priority=50)
supervisor.add_dict(env_config, priority=100)
config = await supervisor.load_config(AppConfig)
print("Configuration sources merged:")
print(f"Debug mode: {config.debug} (from override)")
print(f"Log stage: {config.log_level} (from override)")
print(f"Secret key: {config.secret_key} (from env)")
print(f"Database host: {config.database.host} (from override)")
print(f"Redis URL: {config.redis_url} (from env)")
return config
async def demo_validation():
"""Demo configuration validation."""
print("n=== Configuration Validation Demo ===")
valid_config = {
"debug": True,
"log_level": "DEBUG",
"secret_key": "test-key",
"database": {
"host": "localhost",
"port": 5432
}
}
supervisor = AsyncConfigManager()
supervisor.add_dict(valid_config, priority=0)
attempt:
config = await supervisor.load_config(AppConfig)
print("✓ Reputable configuration loaded effectively")
print(f" Database SSL: {config.database.ssl_enabled} (default price)")
print(f" Database pool dimension: {config.database.pool_size} (default price)")
moreover ValidationError as e:
print(f"✗ Validation error: {e}")
incomplete_config = {
"debug": True,
"log_level": "DEBUG"
}
manager2 = AsyncConfigManager()
manager2.add_dict(incomplete_config, priority=0)
attempt:
config2 = await manager2.load_config(AppConfig)
print("✓ Configuration with defaults loaded effectively")
print(f" Secret key: '{config2.secret_key}' (default empty string)")
moreover ValidationError as e:
print(f"✗ Validation error: {e}")
We reveal superior choices of our config system by two examples. In demo_advanced_config(), we reveal how a lot of configuration sources, base, override, and ambiance, are merged based mostly totally on their priority, with higher-priority sources taking precedence. This highlights the pliability of managing environment-specific overrides. In demo_validation(), we validate every full and partial configurations. The system routinely fills in missing fields with defaults the place potential. It throws clear ValidationErrors when required fields are missing, ensuring kind safety and powerful configuration administration in real-world functions.
async def run_demos():
"""Run all demonstration capabilities."""
attempt:
await demo_simple_config()
await demo_advanced_config()
await demo_validation()
print("n=== All demos achieved effectively! ===")
moreover Exception as e:
print(f"Demo error: {e}")
import traceback
traceback.print_exc()
await run_demos()
if __name__ == "__main__":
attempt:
loop = asyncio.get_event_loop()
if loop.is_running():
print("Working in Jupyter/IPython ambiance")
print("Use: await run_demos()")
else:
asyncio.run(run_demos())
moreover RuntimeError:
asyncio.run(run_demos())
We conclude the tutorial with run_demos(), a utility that sequentially executes all demonstration capabilities, overlaying simple loading, multi-source merging, and validation. To help every Jupyter and commonplace Python environments, we embody conditional logic for working the demos appropriately. This ensures our configuration system is simple to examine, showcase, and mix into a variety of workflows correct out of the sector.
In conclusion, we effectively reveal how AsyncConfig provides a sturdy and extensible foundation for managing configuration in fashionable Python functions. We see how easy it’s to merge a lot of sources, validate configurations in opposition to typed schemas, and reply to reside file modifications in real-time. Whether or not or not we’re setting up microservices, async backends, or CLI devices, this library presents a flexible and developer-friendly technique to deal with configuration securely and successfully.
Attempt the Full Codes. All credit score rating for this evaluation goes to the researchers of this enterprise.
Sponsorship Different: Attain basically probably the most influential AI builders in US and Europe. 1M+ month-to-month readers, 500K+ group builders, infinite potentialities. [Explore Sponsorship]
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is devoted to harnessing the potential of Artificial Intelligence for social good. His latest endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth safety of machine finding out and deep finding out info that’s every technically sound and easily understandable by a big viewers. The platform boasts of over 2 million month-to-month views, illustrating its fame amongst audiences.
Elevate your perspective with NextTech Info, the place innovation meets notion.
Uncover the newest breakthroughs, get distinctive updates, and be a part of with a world neighborhood of future-focused thinkers.
Unlock tomorrow’s tendencies proper this second: study further, subscribe to our publication, and alter into part of the NextTech group at NextTech-news.com
Keep forward of the curve with NextBusiness 24. Discover extra tales, subscribe to our publication, and be a part of our rising neighborhood at nextbusiness24.com

