AW Dev Rethought

Truth can only be found in one place: the code - Robert C. Martin

🧩 Python Automation Recipes – 🧵 Parallel Task Executor


Description:

📌 Introduction

Some automation workflows don’t need to run one task after another.

If tasks are independent — like downloading files, checking URLs, processing reports, or running cleanup jobs — they can run in parallel to save time.

This automation recipe shows how to run multiple tasks concurrently using Python’s ThreadPoolExecutor.


🔎 Explanation

  • The script defines multiple independent tasks.
  • ThreadPoolExecutor runs those tasks using multiple worker threads.
  • Each task returns a result when completed.
  • as_completed() lets us handle results as soon as each task finishes.

This is useful for:

  • API calls
  • downloads
  • file processing
  • health checks
  • report generation

✅ Key Takeaways

  • 🧵 Run independent tasks concurrently.
  • ⏱️ Save time in I/O-heavy workflows.
  • ⚙️ Great for automation pipelines and batch jobs.

Code Snippet:

from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import time
import random

# --- Step 1: Define sample automation tasks ---
def run_task(task_name: str) -> str:
    """
    Simulate an automation task that takes some time to finish.
    Replace this logic with real tasks like API calls, file checks,
    downloads, or report generation.
    """
    duration = random.randint(1, 5)

    print(f"Started: {task_name} | Estimated time: {duration}s")
    time.sleep(duration)

    return f"Completed: {task_name} in {duration}s"


# --- Step 2: Define the list of tasks to run ---
tasks = [
    "Download report",
    "Check website status",
    "Clean temporary files",
    "Generate summary",
    "Send notification",
]

# --- Step 3: Run tasks in parallel ---
print(f"Parallel execution started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")

with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks to the thread pool
    future_to_task = {
        executor.submit(run_task, task): task
        for task in tasks
    }

    # Process results as each task completes
    for future in as_completed(future_to_task):
        task_name = future_to_task[future]

        try:
            result = future.result()
            print(result)

        except Exception as e:
            print(f"Failed: {task_name} | Error: {e}")

print("\nAll parallel tasks completed.")

Link copied!

Comments

Add Your Comment

Comment Added!