StackScripts LogoStackScripts

Title: Python Async Programming: Mastering asyncio 🚀

Python's asyncio module allows developers to write asynchronous code using coroutines, event loops, tasks, and futures. It is ideal for I/O-bound applications such as web scraping, API requests, database queries, and concurrent networking.


1️⃣ Understanding Synchronous vs Asynchronous Programming

Synchronous (Blocking) Execution

import time

def task(name):
    print(f"Starting {name}")
    time.sleep(2)  # Blocking operation
    print(f"Finished {name}")

def main():
    task("Task 1")
    task("Task 2")

main()

Total time: 4 seconds (Tasks run one after another)

Asynchronous (Non-Blocking) Execution

import asyncio

async def task(name):
    print(f"Starting {name}")
    await asyncio.sleep(2)  # Non-blocking operation
    print(f"Finished {name}")

async def main():
    await asyncio.gather(task("Task 1"), task("Task 2"))  # Runs tasks concurrently

asyncio.run(main())

🚀 Total time: 2 seconds (Tasks run concurrently)


2️⃣ Basics of asyncio

Declaring an Async Function (Coroutine)

async def my_coroutine():
    print("Hello, Async!")

💡 Calling await my_coroutine() is required to execute it properly.

Using await for Async Operations

import asyncio

async def fetch_data():
    print("Fetching data...")
    await asyncio.sleep(3)  # Simulating I/O operation
    return "Data received"

async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

📝 await suspends execution of fetch_data() until asyncio.sleep(3) is complete.


3️⃣ Running Multiple Coroutines Concurrently

Using asyncio.gather()

import asyncio

async def task1():
    await asyncio.sleep(2)
    print("Task 1 completed")

async def task2():
    await asyncio.sleep(3)
    print("Task 2 completed")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

Total execution time: 3 seconds (Runs in parallel)


4️⃣ Using asyncio.create_task() for Background Tasks

import asyncio

async def background_task():
    await asyncio.sleep(2)
    print("Background task completed")

async def main():
    task = asyncio.create_task(background_task())  # Runs in the background
    print("Main function is running...")
    await asyncio.sleep(1)
    print("Main function is done.")
    await task  # Wait for the background task to complete

asyncio.run(main())

💡 asyncio.create_task() starts a coroutine without blocking the main function.


5️⃣ Handling Async I/O with asyncio

Example: Making Multiple API Requests (Simulated)

import asyncio
import random

async def fetch_data(url):
    print(f"Fetching from {url}...")
    await asyncio.sleep(random.randint(1, 3))  # Simulating network delay
    print(f"Completed fetching from {url}")
    return f"Data from {url}"

async def main():
    urls = ["https://api.example.com/1", "https://api.example.com/2", "https://api.example.com/3"]
    tasks = [fetch_data(url) for url in urls]
    results = await asyncio.gather(*tasks)
    print("All data fetched:", results)

asyncio.run(main())

🚀 asyncio.gather(*tasks) efficiently fetches data from multiple URLs in parallel.


6️⃣ Using asyncio.Queue for Task Processing

A queue allows for task coordination and controlled execution.

import asyncio

async def worker(queue):
    while not queue.empty():
        task = await queue.get()
        print(f"Processing {task}")
        await asyncio.sleep(1)
        queue.task_done()
        print(f"Completed {task}")

async def main():
    queue = asyncio.Queue()
    
    for i in range(5):
        await queue.put(f"Task {i}")

    workers = [asyncio.create_task(worker(queue)) for _ in range(3)]
    
    await queue.join()  # Wait until all tasks are processed
    for w in workers:
        w.cancel()  # Stop workers

asyncio.run(main())

Efficiently processes 5 tasks using 3 workers.


7️⃣ Exception Handling in asyncio

Handling Errors in Asynchronous Tasks

import asyncio

async def risky_task():
    await asyncio.sleep(2)
    raise ValueError("Something went wrong!")

async def main():
    try:
        await risky_task()
    except ValueError as e:
        print(f"Caught an error: {e}")

asyncio.run(main())

Properly catches exceptions within asyncio tasks.


8️⃣ Integrating asyncio with Databases

Example using aiomysql for MySQL:

import aiomysql
import asyncio

async def fetch_users():
    conn = await aiomysql.connect(host='localhost', user='root', password='password', db='test_db')
    cursor = await conn.cursor()
    await cursor.execute("SELECT * FROM users")
    users = await cursor.fetchall()
    print(users)
    await cursor.close()
    conn.close()

asyncio.run(fetch_users())

🚀 Optimized for high-performance async database queries.


9️⃣ Real-World Example: Asynchronous Web Scraper

Using aiohttp for non-blocking HTTP requests:

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ["https://example.com", "https://python.org"]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(results)

asyncio.run(main())

Asynchronously fetches multiple web pages in parallel.


🔟 When to Use asyncio?

| ✅ Best For | ❌ Not Ideal For | |------------|---------------| | I/O-bound tasks (API requests, DB queries) | CPU-bound tasks (heavy computations) | | Concurrent network requests | Synchronous file I/O | | Background tasks (e.g., email sending) | Simple scripts with no concurrency needs |

For CPU-bound tasks, use concurrent.futures.ThreadPoolExecutor or multiprocessing.


📌 Conclusion

🔹 Coroutines (async def) are at the core of asyncio.
🔹 await suspends execution until a coroutine is complete.
🔹 asyncio.gather() runs multiple coroutines concurrently.
🔹 asyncio.create_task() schedules background tasks.
🔹 Integrate asyncio with databases, web scraping, and APIs for high performance.

🚀 Now you're ready to master async programming in Python! 🎯

🚀 Real-World Project: Async Task Manager with Django, Celery, and FastAPI

Project Idea: Asynchronous Task Processing System

You’ll build a Task Manager that can schedule and process tasks asynchronously using Django (Backend), FastAPI (Microservices), Celery (Task Queue), Redis (Message Broker), and React.js (Frontend).


💡 Key Features

User Dashboard: Create & manage tasks.
Asynchronous Processing: Background task execution.
Real-time Updates: WebSockets for status updates.
FastAPI Microservice: Separate microservice for async task handling.
Celery & Redis: Background task processing.


🛠 Tech Stack

  • Backend (Django + FastAPI) – Django handles user management & UI, FastAPI processes async tasks.
  • Celery + Redis – Background task queue & message broker.
  • React.js – Frontend for users to create & monitor tasks.
  • WebSockets (Django Channels) – Real-time task status updates.
  • Docker – Deploy everything in containers.

🖥 Project Workflow

1️⃣ User submits a task from the frontend.
2️⃣ Django API stores task details in MySQL.
3️⃣ FastAPI microservice receives and processes async requests.
4️⃣ Celery + Redis handles task execution.
5️⃣ Django WebSockets push real-time updates to the user.
6️⃣ React frontend displays task progress dynamically.


👨‍💻 Project Implementation

1️⃣ Django Backend

Install Django & Celery:

pip install django djangorestframework celery redis channels

Set up Django for WebSockets & Celery:

# settings.py
INSTALLED_APPS += ["channels"]
CELERY_BROKER_URL = "redis://localhost:6379/0"

Define Task Model:

from django.db import models

class Task(models.Model):
    title = models.CharField(max_length=255)
    status = models.CharField(max_length=20, default="Pending")  # Pending, Running, Completed

Create Celery Task:

from celery import shared_task
import time

@shared_task
def process_task(task_id):
    from .models import Task
    task = Task.objects.get(id=task_id)
    task.status = "Running"
    task.save()
    time.sleep(10)  # Simulate long process
    task.status = "Completed"
    task.save()

Run Celery Worker:

celery -A myproject worker --loglevel=info

2️⃣ FastAPI Microservice

Install FastAPI & Redis:

pip install fastapi uvicorn aioredis

Create API for async task processing:

from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.post("/run-task/")
async def run_task():
    await asyncio.sleep(5)  # Simulate async process
    return {"message": "Task Completed"}

Run FastAPI server:

uvicorn main:app --reload

3️⃣ React.js Frontend

Install WebSockets Client:

npm install socket.io-client

Create WebSocket connection:

import { useEffect, useState } from "react";
import io from "socket.io-client";

const socket = io("http://localhost:8000");

export default function TaskStatus() {
  const [status, setStatus] = useState("Pending");

  useEffect(() => {
    socket.on("task_update", (data) => {
      setStatus(data.status);
    });
  }, []);

  return <h2>Task Status: {status}</h2>;
}

🔥 Why This Project?

  • Real-world async processing with Django & FastAPI
  • Hands-on experience with Celery & Redis
  • WebSockets for real-time updates
  • Frontend & backend integration
  • Scalable & production-ready

💡 Next Steps

  • 🚀 Add authentication (JWT)
  • Integrate notifications (email & push)
  • 📊 Dashboard with task analytics
  • 🐳 Deploy with Docker & Kubernetes
© 2025 MetaBlog. All rights reserved.