asyncio
🚀Python's asyncio
module allows developers to write asynchronous code using coroutines, event loops, tasks, and futures. It is ideal for I/O-bound applications such as web scraping, API requests, database queries, and concurrent networking.
import time
def task(name):
print(f"Starting {name}")
time.sleep(2) # Blocking operation
print(f"Finished {name}")
def main():
task("Task 1")
task("Task 2")
main()
⏳ Total time: 4 seconds (Tasks run one after another)
import asyncio
async def task(name):
print(f"Starting {name}")
await asyncio.sleep(2) # Non-blocking operation
print(f"Finished {name}")
async def main():
await asyncio.gather(task("Task 1"), task("Task 2")) # Runs tasks concurrently
asyncio.run(main())
🚀 Total time: 2 seconds (Tasks run concurrently)
asyncio
async def my_coroutine():
print("Hello, Async!")
💡 Calling await my_coroutine()
is required to execute it properly.
await
for Async Operationsimport asyncio
async def fetch_data():
print("Fetching data...")
await asyncio.sleep(3) # Simulating I/O operation
return "Data received"
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())
📝 await
suspends execution of fetch_data()
until asyncio.sleep(3)
is complete.
asyncio.gather()
import asyncio
async def task1():
await asyncio.sleep(2)
print("Task 1 completed")
async def task2():
await asyncio.sleep(3)
print("Task 2 completed")
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
✅ Total execution time: 3 seconds (Runs in parallel)
asyncio.create_task()
for Background Tasksimport asyncio
async def background_task():
await asyncio.sleep(2)
print("Background task completed")
async def main():
task = asyncio.create_task(background_task()) # Runs in the background
print("Main function is running...")
await asyncio.sleep(1)
print("Main function is done.")
await task # Wait for the background task to complete
asyncio.run(main())
💡 asyncio.create_task()
starts a coroutine without blocking the main function.
asyncio
import asyncio
import random
async def fetch_data(url):
print(f"Fetching from {url}...")
await asyncio.sleep(random.randint(1, 3)) # Simulating network delay
print(f"Completed fetching from {url}")
return f"Data from {url}"
async def main():
urls = ["https://api.example.com/1", "https://api.example.com/2", "https://api.example.com/3"]
tasks = [fetch_data(url) for url in urls]
results = await asyncio.gather(*tasks)
print("All data fetched:", results)
asyncio.run(main())
🚀 asyncio.gather(*tasks)
efficiently fetches data from multiple URLs in parallel.
asyncio.Queue
for Task ProcessingA queue allows for task coordination and controlled execution.
import asyncio
async def worker(queue):
while not queue.empty():
task = await queue.get()
print(f"Processing {task}")
await asyncio.sleep(1)
queue.task_done()
print(f"Completed {task}")
async def main():
queue = asyncio.Queue()
for i in range(5):
await queue.put(f"Task {i}")
workers = [asyncio.create_task(worker(queue)) for _ in range(3)]
await queue.join() # Wait until all tasks are processed
for w in workers:
w.cancel() # Stop workers
asyncio.run(main())
✅ Efficiently processes 5 tasks using 3 workers.
asyncio
import asyncio
async def risky_task():
await asyncio.sleep(2)
raise ValueError("Something went wrong!")
async def main():
try:
await risky_task()
except ValueError as e:
print(f"Caught an error: {e}")
asyncio.run(main())
✅ Properly catches exceptions within asyncio
tasks.
asyncio
with DatabasesExample using aiomysql
for MySQL:
import aiomysql
import asyncio
async def fetch_users():
conn = await aiomysql.connect(host='localhost', user='root', password='password', db='test_db')
cursor = await conn.cursor()
await cursor.execute("SELECT * FROM users")
users = await cursor.fetchall()
print(users)
await cursor.close()
conn.close()
asyncio.run(fetch_users())
🚀 Optimized for high-performance async database queries.
Using aiohttp
for non-blocking HTTP requests:
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["https://example.com", "https://python.org"]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
✅ Asynchronously fetches multiple web pages in parallel.
asyncio
?| ✅ Best For | ❌ Not Ideal For | |------------|---------------| | I/O-bound tasks (API requests, DB queries) | CPU-bound tasks (heavy computations) | | Concurrent network requests | Synchronous file I/O | | Background tasks (e.g., email sending) | Simple scripts with no concurrency needs |
For CPU-bound tasks, use concurrent.futures.ThreadPoolExecutor
or multiprocessing.
🔹 Coroutines (async def
) are at the core of asyncio
.
🔹 await
suspends execution until a coroutine is complete.
🔹 asyncio.gather()
runs multiple coroutines concurrently.
🔹 asyncio.create_task()
schedules background tasks.
🔹 Integrate asyncio
with databases, web scraping, and APIs for high performance.
🚀 Now you're ready to master async programming in Python! 🎯
You’ll build a Task Manager that can schedule and process tasks asynchronously using Django (Backend), FastAPI (Microservices), Celery (Task Queue), Redis (Message Broker), and React.js (Frontend).
✅ User Dashboard: Create & manage tasks.
✅ Asynchronous Processing: Background task execution.
✅ Real-time Updates: WebSockets for status updates.
✅ FastAPI Microservice: Separate microservice for async task handling.
✅ Celery & Redis: Background task processing.
1️⃣ User submits a task from the frontend.
2️⃣ Django API stores task details in MySQL.
3️⃣ FastAPI microservice receives and processes async requests.
4️⃣ Celery + Redis handles task execution.
5️⃣ Django WebSockets push real-time updates to the user.
6️⃣ React frontend displays task progress dynamically.
Install Django & Celery:
pip install django djangorestframework celery redis channels
Set up Django for WebSockets & Celery:
# settings.py
INSTALLED_APPS += ["channels"]
CELERY_BROKER_URL = "redis://localhost:6379/0"
Define Task Model:
from django.db import models
class Task(models.Model):
title = models.CharField(max_length=255)
status = models.CharField(max_length=20, default="Pending") # Pending, Running, Completed
Create Celery Task:
from celery import shared_task
import time
@shared_task
def process_task(task_id):
from .models import Task
task = Task.objects.get(id=task_id)
task.status = "Running"
task.save()
time.sleep(10) # Simulate long process
task.status = "Completed"
task.save()
Run Celery Worker:
celery -A myproject worker --loglevel=info
Install FastAPI & Redis:
pip install fastapi uvicorn aioredis
Create API for async task processing:
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.post("/run-task/")
async def run_task():
await asyncio.sleep(5) # Simulate async process
return {"message": "Task Completed"}
Run FastAPI server:
uvicorn main:app --reload
Install WebSockets Client:
npm install socket.io-client
Create WebSocket connection:
import { useEffect, useState } from "react";
import io from "socket.io-client";
const socket = io("http://localhost:8000");
export default function TaskStatus() {
const [status, setStatus] = useState("Pending");
useEffect(() => {
socket.on("task_update", (data) => {
setStatus(data.status);
});
}, []);
return <h2>Task Status: {status}</h2>;
}