How to Build a High-Performance Distributed Task Routing System Using Kombu with Topic Exchanges and Concurrent Workers

 

In this tutorial, we build a fully functional event-driven workflow using , treating messaging as a core architectural capability. We walk through step by step the setup of exchanges, routing keys, background workers, and concurrent producers, allowing us to observe a real distributed system. As we implement each component, we see how clean message flow, asynchronous processing, and routing patterns give us the same power that production microservices rely on every day. Check out the .

!pip install kombu


import threading
import time
import logging
import uuid
import datetime
import sys


from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin


logging.basicConfig(
   level=logging.INFO,
   format='%(message)s',
   handlers=[logging.StreamHandler(sys.stdout)],
   force=True
)
logger = logging.getLogger(__name__)


BROKER_URL = "memory://localhost/"

We begin by installing Kombu, importing dependencies, and configuring logging so we can clearly see every message flowing through the system. We also set the in-memory broker URL, allowing us to run everything locally in Colab without needing RabbitMQ. This setup forms the foundation for our distributed messaging workflow. Check out the .

media_exchange = Exchange('media_exchange', type='topic', durable=True)


task_queues = [
   Queue('video_queue', media_exchange, routing_key='video.#'),
   Queue('audit_queue', media_exchange, routing_key='#'),
]

We define a topic exchange to flexibly route messages using wildcard patterns. We also create two queues: one dedicated to video-related tasks and another audit queue that listens to everything. Using topic routing, we can precisely control how messages flow across the system. Check out the .

class Worker(ConsumerMixin):
   def __init__(self, connection, queues):
       self.connection = connection
       self.queues = queues
       self.should_stop = False


   def get_consumers(self, Consumer, channel):
       return [
           Consumer(queues=self.queues,
                    callbacks=[self.on_message],
                    accept=['json'],
                    prefetch_count=1)
       ]


   def on_message(self, body, message):
       routing_key = message.delivery_info['routing_key']
       payload_id = body.get('id', 'unknown')


       logger.info(f"n⚡ RECEIVED MSG via key: [{routing_key}]")
       logger.info(f"   Payload ID: {payload_id}")
      
       try:
           if 'video' in routing_key:
               self.process_video(body)
           elif 'audit' in routing_key:
               logger.info("   🔍 [Audit] Logging event...")
          
           message.ack()
           logger.info(f"   ✅ ACKNOWLEDGED")


       except Exception as e:
           logger.error(f"   ❌ ERROR: {e}")


   def process_video(self, body):
       logger.info("   ⚙  [Processor] Transcoding video (Simulating work...)")
       time.sleep(0.5)

We implement a custom worker using Kombu’s ConsumerMixin to run it in a background thread. In the message callback, we inspect the routing key, invoke the appropriate processing function, and acknowledge the message. This worker architecture gives us clean, concurrent message consumption with full control. Check out the .

def publish_messages(connection):
   producer = Producer(connection)
  
   tasks = [
       ('video.upload', {'file': 'movie.mp4'}),
       ('user.login', {'user': 'admin'}),
   ]


   logger.info("n🚀 PRODUCER: Starting to publish messages...")
  
   for r_key, data in tasks:
       data['id'] = str(uuid.uuid4())[:8]
      
       logger.info(f"📤 SENDING: {r_key} -> {data}")
      
       producer.publish(
           data,
           exchange=media_exchange,
           routing_key=r_key,
           serializer='json'
       )
       time.sleep(1.5)


   logger.info("🏁 PRODUCER: Done.")

We now build a producer that sends structured JSON payloads into the exchange with different routing keys. We generate unique IDs for each event and observe how they are routed to other queues. This mirrors real-world microservice event publishing, where producers and consumers remain decoupled. Check out the .

def run_example():
   with Connection(BROKER_URL) as conn:
       worker = Worker(conn, task_queues)
       worker_thread = threading.Thread(target=worker.run)
       worker_thread.daemon = True
       worker_thread.start()
      
       logger.info("✅ SYSTEM: Worker thread started.")
       time.sleep(1)


       try:
           publish_messages(conn)
           time.sleep(2)
       except KeyboardInterrupt:
           pass
       finally:
           worker.should_stop = True
           logger.info("n👋 SYSTEM: Execution complete.")


if __name__ == "__main__":
   run_example()

We start the worker in a background thread and fire the producer in the main thread. This structure gives us a mini distributed system running in Colab. By observing the logs, we see messages published → routed → consumed → acknowledged, completing the full event-processing lifecycle.

In conclusion, we orchestrated a dynamic, distributed task-routing pipeline that processes real-time events with clarity and precision. We witnessed how Kombu abstracts away the complexity of messaging systems while still giving us fine-grained control over routing, consumption, and worker concurrency. As we see messages move from producer to exchange to queue to worker, we gained a deeper appreciation for the elegance of event-driven system design, and we are now well-equipped to scale this foundation into robust microservices, background processors, and enterprise-grade workflows.


Check out the . Feel free to check out our . Also, feel free to follow us on  and don’t forget to join our  and Subscribe to .

The post appeared first on .

Read More

A Complete Workflow for Automated Prompt Optimization Using Gemini Flash, Few-Shot Selection, and Evolutionary Instruction Search

 

In this tutorial, we shift from traditional prompt crafting to a more systematic, programmable approach by treating prompts as tunable parameters rather than static text. Instead of guessing which instruction or example works best, we build an optimization loop around Gemini 2.0 Flash that experiments, evaluates, and automatically selects the strongest prompt configuration. In this implementation, we watch our model improve step by step, demonstrating how prompt engineering becomes far more powerful when we orchestrate it with data-driven search rather than intuition. Check out the .

import google.generativeai as genai
import json
import random
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import numpy as np
from collections import Counter


def setup_gemini(api_key: str = None):
   if api_key is None:
       api_key = input("Enter your Gemini API key: ").strip()
   genai.configure(api_key=api_key)
   model = genai.GenerativeModel('gemini-2.0-flash-exp')
   print("✓ Gemini 2.0 Flash configured")
   return model


@dataclass
class Example:
   text: str
   sentiment: str
   def to_dict(self):
       return {"text": self.text, "sentiment": self.sentiment}


@dataclass
class Prediction:
   sentiment: str
   reasoning: str = ""
   confidence: float = 1.0

We import all required libraries and define the setup_gemini helper to configure Gemini 2.0 Flash. We also create the Example and Prediction data classes to represent dataset entries and model outputs in a clean, structured way. Check out the .

def create_dataset() -> Tuple[List[Example], List[Example]]:
   train_data = [
       Example("This movie was absolutely fantastic! Best film of the year.", "positive"),
       Example("Terrible experience, waste of time and money.", "negative"),
       Example("The product works as expected, nothing special.", "neutral"),
       Example("I'm blown away by the quality and attention to detail!", "positive"),
       Example("Disappointing and overpriced. Would not recommend.", "negative"),
       Example("It's okay, does the job but could be better.", "neutral"),
       Example("Incredible customer service and amazing results!", "positive"),
       Example("Complete garbage, broke after one use.", "negative"),
       Example("Average product, met my basic expectations.", "neutral"),
       Example("Revolutionary! This changed everything for me.", "positive"),
       Example("Frustrating bugs and poor design choices.", "negative"),
       Example("Decent quality for the price point.", "neutral"),
       Example("Exceeded all my expectations, truly remarkable!", "positive"),
       Example("Worst purchase I've ever made, avoid at all costs.", "negative"),
       Example("It's fine, nothing to complain about really.", "neutral"),
       Example("Absolutely stellar performance, 5 stars!", "positive"),
       Example("Broken and unusable, total disaster.", "negative"),
       Example("Meets requirements, standard quality.", "neutral"),
   ]
   val_data = [
       Example("Absolutely love it, couldn't be happier!", "positive"),
       Example("Broken on arrival, very upset.", "negative"),
       Example("Works fine, no major issues.", "neutral"),
       Example("Outstanding performance and great value!", "positive"),
       Example("Regret buying this, total letdown.", "negative"),
       Example("Adequate for basic use.", "neutral"),
   ]
   return train_data, val_data


class PromptTemplate:
   def __init__(self, instruction: str = "", examples: List[Example] = None):
       self.instruction = instruction
       self.examples = examples or []
   def format(self, text: str) -> str:
       prompt_parts = []
       if self.instruction:
           prompt_parts.append(self.instruction)
       if self.examples:
           prompt_parts.append("nExamples:")
           for ex in self.examples:
               prompt_parts.append(f"nText: {ex.text}")
               prompt_parts.append(f"Sentiment: {ex.sentiment}")
       prompt_parts.append(f"nText: {text}")
       prompt_parts.append("Sentiment:")
       return "n".join(prompt_parts)
   def clone(self):
       return PromptTemplate(self.instruction, self.examples.copy())

We generate a small but diverse sentiment dataset for training and validation using the create_dataset function. We then define PromptTemplate, which lets us assemble instructions, a few-shot examples, and a current query into a single prompt string. We treat the template as a programmable object so we can swap instructions and examples during optimization. Check out the .

class SentimentModel:
   def __init__(self, model, prompt_template: PromptTemplate):
       self.model = model
       self.prompt_template = prompt_template


   def predict(self, text: str) -> Prediction:
       prompt = self.prompt_template.format(text)
       try:
           response = self.model.generate_content(prompt)
           result = response.text.strip().lower()
           for sentiment in ['positive', 'negative', 'neutral']:
               if sentiment in result:
                   return Prediction(sentiment=sentiment, reasoning=result)
           return Prediction(sentiment='neutral', reasoning=result)
       except Exception as e:
           return Prediction(sentiment='neutral', reasoning=str(e))


   def evaluate(self, dataset: List[Example]) -> float:
       correct = 0
       for example in dataset:
           pred = self.predict(example.text)
           if pred.sentiment == example.sentiment:
               correct += 1
       return (correct / len(dataset)) * 100

We wrap Gemini in the SentimentModel class so we can call it like a regular classifier. We format prompts via the template, call generate_content, and post-process the text to extract one of three sentiments. We also add an evaluate method so we can measure accuracy over any dataset with a single call. Check out the .

class PromptOptimizer:
   def __init__(self, model):
       self.model = model
       self.instruction_candidates = [
           "Analyze the sentiment of the following text. Classify as positive, negative, or neutral.",
           "Classify the sentiment: positive, negative, or neutral.",
           "Determine if this text expresses positive, negative, or neutral sentiment.",
           "What is the emotional tone? Answer: positive, negative, or neutral.",
           "Sentiment classification (positive/negative/neutral):",
           "Evaluate sentiment and respond with exactly one word: positive, negative, or neutral.",
       ]


   def select_best_examples(self, train_data: List[Example], val_data: List[Example], n_examples: int = 3) -> List[Example]:
       best_examples = None
       best_score = 0
       for _ in range(10):
           examples_by_sentiment = {
               'positive': [e for e in train_data if e.sentiment == 'positive'],
               'negative': [e for e in train_data if e.sentiment == 'negative'],
               'neutral': [e for e in train_data if e.sentiment == 'neutral']
           }
           selected = []
           for sentiment in ['positive', 'negative', 'neutral']:
               if examples_by_sentiment[sentiment]:
                   selected.append(random.choice(examples_by_sentiment[sentiment]))
           remaining = [e for e in train_data if e not in selected]
           while len(selected) < n_examples and remaining:
               selected.append(random.choice(remaining))
               remaining.remove(selected[-1])
           template = PromptTemplate(instruction=self.instruction_candidates[0], examples=selected)
           test_model = SentimentModel(self.model, template)
           score = test_model.evaluate(val_data[:3])
           if score > best_score:
               best_score = score
               best_examples = selected
       return best_examples


   def optimize_instruction(self, examples: List[Example], val_data: List[Example]) -> str:
       best_instruction = self.instruction_candidates[0]
       best_score = 0
       for instruction in self.instruction_candidates:
           template = PromptTemplate(instruction=instruction, examples=examples)
           test_model = SentimentModel(self.model, template)
           score = test_model.evaluate(val_data)
           if score > best_score:
               best_score = score
               best_instruction = instruction
       return best_instruction

We introduce the PromptOptimizer class and define a pool of candidate instructions to test. We implement select_best_examples to search for a small, diverse set of few-shot examples and optimize_instruction to score each instruction variant on validation data. We are effectively turning prompt design into a lightweight search problem over examples and instructions. Check out the .

  def compile(self, train_data: List[Example], val_data: List[Example], n_examples: int = 3) -> PromptTemplate:
       best_examples = self.select_best_examples(train_data, val_data, n_examples)
       best_instruction = self.optimize_instruction(best_examples, val_data)
       optimized_template = PromptTemplate(instruction=best_instruction, examples=best_examples)
       return optimized_template


def main():
   print("="*70)
   print("Prompt Optimization Tutorial")
   print("Stop Writing Prompts, Start Programming Them!")
   print("="*70)


   model = setup_gemini()
   train_data, val_data = create_dataset()
   print(f"✓ {len(train_data)} training examples, {len(val_data)} validation examples")


   baseline_template = PromptTemplate(
       instruction="Classify sentiment as positive, negative, or neutral.",
       examples=[]
   )
   baseline_model = SentimentModel(model, baseline_template)
   baseline_score = baseline_model.evaluate(val_data)


   manual_examples = train_data[:3]
   manual_template = PromptTemplate(
       instruction="Classify sentiment as positive, negative, or neutral.",
       examples=manual_examples
   )
   manual_model = SentimentModel(model, manual_template)
   manual_score = manual_model.evaluate(val_data)


   optimizer = PromptOptimizer(model)
   optimized_template = optimizer.compile(train_data, val_data, n_examples=4)

We add the compile method to combine the best examples and best instructions into a final optimized PromptTemplate. Inside main, we configure Gemini, build the dataset, and evaluate both a zero-shot baseline and a simple manual few-shot prompt. We then call the optimizer to produce our compiled, optimized prompt for sentiment analysis. Check out the .

optimized_model = SentimentModel(model, optimized_template)
   optimized_score = optimized_model.evaluate(val_data)


   print(f"Baseline (zero-shot):     {baseline_score:.1f}%")
   print(f"Manual few-shot:          {manual_score:.1f}%")
   print(f"Optimized (compiled):     {optimized_score:.1f}%")


   print(f"nInstruction: {optimized_template.instruction}")
   print(f"nSelected Examples ({len(optimized_template.examples)}):")
   for i, ex in enumerate(optimized_template.examples, 1):
       print(f"n{i}. Text: {ex.text}")
       print(f"   Sentiment: {ex.sentiment}")


   test_cases = [
       "This is absolutely amazing, I love it!",
       "Completely broken and unusable.",
       "It works as advertised, no complaints."
   ]


   for test_text in test_cases:
       print(f"nInput: {test_text}")
       pred = optimized_model.predict(test_text)
       print(f"Predicted: {pred.sentiment}")


   print("✓ Tutorial Complete!")


if __name__ == "__main__":
   main()

We evaluate the optimized model and compare its accuracy against the baseline and manual few-shot setups. We print the chosen instruction and the selected examples so we can inspect what the optimizer discovers, and then we run a few live test sentences to see predictions in action. We finish by summarizing the improvements and reinforcing the idea that prompts can be tuned programmatically rather than written by hand.

In conclusion, we implemented how programmatic prompt optimization provides a repeatable, evidence-driven workflow for designing high-performing prompts. We began with a fragile baseline, then iteratively tested instructions, selected diverse examples, and compiled an optimized template that outperforms manual attempts. This process shows that we no longer rely on trial-and-error prompting; instead, we orchestrated a controlled optimization cycle. Also, we can extend this pipeline to new tasks, richer datasets, and more advanced scoring methods, allowing us to engineer prompts with precision, confidence, and scalability.


Check out the . Feel free to check out our . Also, feel free to follow us on  and don’t forget to join our  and Subscribe to . Wait! are you on telegram? 

The post appeared first on .

Read More

Unsloth AI and NVIDIA are Revolutionizing Local LLM Fine-Tuning: From RTX Desktops to DGX Spark

 

Fine-tune popular AI models faster with on NVIDIA RTX AI PCs such as to and the new to build personalized assistants for coding, creative work, and complex agentic workflows.

The landscape of modern AI is shifting. We are moving away from a total reliance on massive, generalized cloud models and entering the era of local, agentic AI. Whether it is tuning a chatbot to handle hyper-specific product support or building a personal assistant that manages intricate schedules, the potential for generative AI on local hardware is boundless.

However, developers face a persistent bottleneck: How do you get a Small Language Model (SLM) to punch above its weight class and respond with high accuracy for specialized tasks?

The answer is Fine-Tuning, and the tool of choice is .

Unsloth provides an easy and high-speed method to customize models. Optimized for efficient, low-memory training on NVIDIA GPUs, Unsloth scales effortlessly from all the way to the , the world’s smallest AI supercomputer.

The Fine-Tuning Paradigm

Think of fine-tuning as a high-intensity boot camp for your AI. By feeding the model examples tied to a specific workflow, it learns new patterns, adapts to specialized tasks, and dramatically improves accuracy.

Depending on your hardware and goals, developers generally utilize one of three main methods:

1. Parameter-Efficient Fine-Tuning (PEFT)

  • The Tech: LoRA (Low-Rank Adaptation) or QLoRA.
  • How it Works: Instead of retraining the whole brain, this updates only a small portion of the model. It is the most efficient way to inject domain knowledge without breaking the bank.
  • Best For: Improving coding accuracy, legal/scientific adaptation, or tone alignment.
  • Data Needed: Small datasets (100–1,000 prompt-sample pairs).

2. Full Fine-Tuning

  • The Tech: Updating all model parameters.
  • How it Works: This is a total overhaul. It is essential when the model needs to rigidly adhere to specific formats or strict guardrails.
  • Best For: Advanced AI agents and distinct persona constraints.
  • Data Needed: Large datasets (1,000+ prompt-sample pairs).

3. Reinforcement Learning (RL)

  • The Tech: Preference optimization (RLHF/DPO).
  • How it Works: The model learns by interacting with an environment and receiving feedback signals to improve behavior over time.
  • Best For: High-stakes domains (Law, Medicine) or autonomous agents.
  • Data Needed: Action model + Reward model + RL Environment.

The Hardware Reality: VRAM Management Guide

One of the most critical factors in local fine-tuning is Video RAM (VRAM). Unsloth is magic, but physics still applies. Here is the breakdown of what hardware you need based on your target model size and tuning method.

For PEFT (LoRA/QLoRA)

This is where most hobbyists and individual developers will live.

  • <12B Parameters: ~8GB VRAM (Standard GeForce RTX GPUs).
  • 12B–30B Parameters: ~24GB VRAM (Perfect for GeForce RTX 5090).
  • 30B–120B Parameters: ~80GB VRAM (Requires DGX Spark or RTX PRO).

For Full Fine-Tuning

For when you need total control over the model weights.

  • <3B Parameters: ~25GB VRAM (GeForce RTX 5090 or RTX PRO).
  • 3B–15B Parameters: ~80GB VRAM (DGX Spark territory).

For Reinforcement Learning

The cutting edge of agentic behavior.

  • <12B Parameters: ~12GB VRAM (GeForce RTX 5070).
  • 12B–30B Parameters: ~24GB VRAM (GeForce RTX 5090).
  • 30B–120B Parameters: ~80GB VRAM (DGX Spark).

Unsloth: The “Secret Sauce” of Speed

Why is Unsloth winning the fine-tuning race? It comes down to math.

LLM fine-tuning involves billions of matrix multiplications, the kind of math well suited for parallel, GPU-accelerated computing. Unsloth excels by translating the complex matrix multiplication operations into efficient, custom kernels on NVIDIA GPUs. This optimization allows Unsloth to boost the performance of the Hugging Face transformers library by 2.5x on NVIDIA GPUs.

By combining raw speed with ease of use, Unsloth is democratizing high-performance AI, making it accessible to everyone from a student on a laptop to a researcher on a DGX system.

Representative Use Case Study 1: The “Personal Knowledge Mentor”

The Goal: Take a base model (like Llama 3.2 ) and teach it to respond in a specific, high-value style, acting as a mentor who explains complex topics using simple analogies and always ends with a thought-provoking question to encourage critical thinking.

The Problem: Standard system prompts are brittle. To get a high-quality “Mentor” persona, you must provide a 500+ token instruction block. This creates a “Token Tax” that slows down every response and eats up valuable memory. Over long conversations, the model suffers from “Persona Drift,” eventually forgetting its rules and reverting to a generic, robotic assistant. Furthermore, it is nearly impossible to “prompt” a specific verbal rhythm or subtle “vibe” without the model sounding like a forced caricature.

The Solution: sing Unsloth to run a local QLoRA fine-tune on a GeForce RTX GPU, powered by a curated dataset of 50–100 high-quality “Mentor” dialogue examples. This process “bakes” the personality directly into the model’s neural weights rather than relying on the temporary memory of a prompt. 

The Result: A standard model might miss the analogy or forget the closing question when the topic gets difficult. The fine-tuned model acts as a “Native Mentor.” It maintains its persona indefinitely without a single line of system instructions. It picks up on implicit patterns, the specific way a mentor speaks, making the interaction feel authentic and fluid.

Representative use Case Study 2: The “Legacy Code” Architect

To see the power of local fine-tuning, look no further than the banking sector.

The Problem: Banks run on ancient code (COBOL, Fortran). Standard 7B models hallucinate when trying to modernize this logic, and sending proprietary banking code to GPT-4 is a massive security violation.

The Solution: Using Unsloth to fine-tune a 32B model (like Qwen 2.5 Coder) specifically on the company’s 20-year-old “spaghetti code.”

The Result: A standard 7B model translates line-by-line. The fine-tuned 32B model acts as a “Senior Architect.” It holds entire files in context, refactoring 2,000-line monoliths into clean microservices while preserving exact business logic, all performed securely on local NVIDIA hardware.

Representative use Case Study 3: The Privacy-First “AI Radiologist”

While text is powerful, the next frontier of local AI is Vision. Medical institutions sit on mountains of imaging data (X-rays, CT scans) that cannot legally be uploaded to public cloud models due to HIPAA/GDPR compliance.

The Problem: Radiologists are overwhelmed, and standard Vision Language Models (VLMs) like Llama 3.2 Vision are too generalized, identifying a “person” easily, but missing subtle hairline fractures or early-stage anomalies in low-contrast X-rays.

The Solution: A healthcare research team utilizes . Instead of training from scratch (costing millions), they take a pre-trained Llama 3.2 Vision (11B) model and fine-tune it locally on an NVIDIA DGX Spark or dual-RTX 6000 Ada workstation. They feed the model a curated, private dataset of 5,000 anonymized X-rays paired with expert radiologist reports, using LoRA to update vision encoders specifically for medical anomalies.

The Outcome: The result is a specialized “AI Resident” operating entirely offline.

  • Accuracy: Detection of specific pathologies improves over the base model.
  • Privacy: No patient data ever leaves the on-premise hardware.
  • Speed: Unsloth optimizes the vision adapters, cutting training time from weeks to hours, allowing for weekly model updates as new data arrives.

Here is the technical breakdown of how to build this solution using Unsloth based on the Unsloth.

For a tutorial on how to fine-tune vision models using Llama 3.2 click . 

Ready to Start?

Unsloth and NVIDIA have provided comprehensive guides to get you running immediately.

  • For Desktop Users:
  • For Vision Models:
  • For Pros: Learn how to .

Thanks to the NVIDIA AI team for the thought leadership/ Resources for this article. NVIDIA AI team has supported this content/article.

The post appeared first on .

Read More