ORCA Celery Deployment Guide
This document covers the distributed task queue infrastructure for the ORCA pipeline.
Table of Contents
Installation
See the main README for environment setup and configuration.
Verify Celery connectivity (requires valid ~/orca-conf.yml):
# Test broker connection
python -c "from orca.celery import app; print(app.control.ping(timeout=2))"
Cluster Architecture
┌─────────────────────────────────────────┐
│ Shared Storage (NFS) │
│ /opt/devel/pipeline/envs/ │
│ /home/pipeline/ │
│ /lustre/pipeline/ │
└─────────────────────────────────────────┘
│ │ │ │
┌─────────────────────────┼───────┼───────┼───────┼─────────────────────────┐
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌──────────────────┐ ┌──────────────────────────────────────────────┐ ┌──────────────────┐
│ lwacalimhead │ │ Worker Nodes │ │ lwacalim10 │
│ (10.41.0.74) │ │ lwacalim00-09 (10.41.0.75-84) │ │ (10.41.0.85) │
│ │ │ │ │ │
│ ┌────────────┐ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ ┌────────────┐ │
│ │ RabbitMQ │◀─┼────┼──│ Worker │ │ Worker │ │ Worker │ ... │────┼─▶│ Redis │ │
│ │ :5672 │ │ │ │ default │ │ imaging │ │ bandpass│ │ │ │ :6379 │ │
│ └────────────┘ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ └────────────┘ │
│ │ │ │ │ │
│ │ │ Each node runs Celery workers that: │ │ Stores: │
│ │ │ • Pull tasks from RabbitMQ │ │ • Task results │
│ │ │ • Execute pipeline functions │ │ • Spectrum cache│
│ │ │ • Push results to Redis │ │ │
│ │ │ │ │ ┌────────────┐ │
│ │ │ │ │ │ Flower │ │
│ │ │ │ │ │ :5555 │ │
│ │ │ │ │ └────────────┘ │
└──────────────────┘ └──────────────────────────────────────────────┘ └──────────────────┘
Node Summary
Hostname |
IP |
Role |
|---|---|---|
lwacalimhead |
10.41.0.74 |
RabbitMQ broker |
lwacalim00-09 |
10.41.0.75-84 |
Worker nodes |
lwacalim10 |
10.41.0.85 |
Redis backend, Flower, Worker node |
Components Overview
Message Flow
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Pipeline Script │ │ RabbitMQ │ │ Celery Worker │ │ Redis │
│ (your code) │ │ (message queue) │ │ (task executor) │ │ (result store) │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │ │
│ 1. task.delay() │ │ │
│ ──────────────────▶ │ │ │
│ │ │ │
│ │ 2. deliver task │ │
│ │ ──────────────────▶ │ │
│ │ │ │
│ │ │ 3. execute function │
│ │ │ ──────────────────── │
│ │ │ │
│ │ │ 4. store result │
│ │ │ ──────────────────▶ │
│ │ │ │
│ 5. result.get() │ │ │
│ ◀──────────────────────────────────────────────────────────────────────│
│ │ │ │
Simplified view:
Your Script ──▶ RabbitMQ ──▶ Worker ──▶ Redis ──▶ Your Script
(queue task) (execute) (result) (retrieve)
Component Details
Component |
Location |
Port |
Purpose |
|---|---|---|---|
RabbitMQ |
lwacalimhead |
5672 |
Message broker - queues tasks |
Redis |
lwacalim10 |
6379 |
Result backend - stores task results |
Celery Workers |
All nodes |
- |
Execute tasks from queues |
Flower |
lwacalim10 |
5555 |
Web UI for monitoring |
Queues
Defined in orca/celery.py:
Queue |
Purpose |
Routed Tasks |
|---|---|---|
|
General processing |
Most tasks |
|
Imaging pipeline |
|
|
Bandpass calibration |
|
|
Cosmology processing |
|
Configuration
User Configuration File
Each user needs ~/orca-conf.yml:
queue:
prefix: default
broker_uri: pyamqp://<username>:<password>@rabbitmq.calim.mcs.pvt:5672/<vhost>
result_backend_uri: redis://10.41.0.85:6379/0
telescope:
n_ant: 352
n_subband: 16
n_chan: 192
outriggers: [...] # See default-orca-conf.yml
execs:
wsclean: /opt/bin/wsclean
aoflagger: /opt/bin/aoflagger
cluster: calim
Managing Workers
Starting Workers
# Basic worker on default queue
celery -A orca.celery worker \
--hostname=default@$(hostname) \
--loglevel=INFO \
--concurrency=40 \
-Q default
# With CPU pinning (recommended for production)
taskset -c 0-39 celery -A orca.celery worker \
--hostname=default@$(hostname) \
--loglevel=INFO \
--concurrency=40 \
-Q default
# Worker for specific queue
celery -A orca.celery worker \
--hostname=imaging@$(hostname) \
--loglevel=INFO \
--concurrency=40 \
-Q imaging
Stopping Workers
# Graceful shutdown (waits for current tasks)
pkill -15 -f 'celery.*worker'
# Force kill (immediate)
pkill -9 -f 'celery.*worker'
# Across all nodes with pdsh
pdsh -w lwacalim[00-10] "pkill -15 -f 'celery.*worker'"
Checking Worker Status
# From Python
python3 << 'EOF'
from orca.celery import app
# Ping all workers
print("Workers:", app.control.ping(timeout=2))
# Active queues
i = app.control.inspect()
print("Queues:", i.active_queues())
# Current tasks
print("Active:", i.active())
EOF
Code Updates
When you update orca code, workers need to be restarted:
# 1. Stop workers gracefully
pdsh -w lwacalim[00-10] "pkill -15 -f 'celery.*worker'"
# 2. Wait for tasks to finish (or check Flower)
sleep 30
# 3. Pull code updates (if using git)
pdsh -w lwacalim[00-10] "cd /opt/devel/pipeline/distributed-pipeline && git pull"
# 4. Restart workers (in screen sessions on each node)
# Or use systemd if configured
Monitoring
Flower Web UI
Flower provides real-time monitoring of workers and tasks.
# Start Flower (on lwacalim10)
celery -A orca.celery flower --port=5555
# Access via SSH tunnel
ssh -L 5555:localhost:5555 <user>@lwacalim10
# Then open: http://localhost:5555
Command-Line Monitoring
# Check Redis connectivity
python3 -c "
import redis
r = redis.Redis.from_url('redis://10.41.0.85:6379/0')
print('Redis PING:', r.ping())
print('Keys in DB:', r.dbsize())
"
# Check RabbitMQ queues (on lwacalimhead with sudo)
sudo rabbitmqctl list_queues -p <vhost>
# Check worker processes
pdsh -w lwacalim[00-10] "ps aux | grep 'celery.*worker' | grep -v grep | wc -l"
Inspecting Tasks
from orca.celery import app
# Get inspector
i = app.control.inspect()
# Active tasks (currently running)
i.active()
# Reserved tasks (fetched but not started)
i.reserved()
# Scheduled tasks (eta/countdown)
i.scheduled()
# Registered tasks
i.registered()
# Worker stats
i.stats()
Adding New Users
1. RabbitMQ Setup
On lwacalimhead (requires sudo):
# Create vhost for the user
sudo rabbitmqctl add_vhost <username>
# Create user
sudo rabbitmqctl add_user <username> <password>
# Grant permissions on their vhost
sudo rabbitmqctl set_permissions -p <username> <username> ".*" ".*" ".*"
# Verify
sudo rabbitmqctl list_users
sudo rabbitmqctl list_vhosts
2. User Configuration
The new user creates ~/orca-conf.yml:
queue:
prefix: default
broker_uri: pyamqp://<username>:<password>@rabbitmq.calim.mcs.pvt:5672/<username>
result_backend_uri: redis://10.41.0.85:6379/0 # Shared Redis is fine
telescope:
n_ant: 352
n_subband: 16
n_chan: 192
outriggers: [...] # Copy from default-orca-conf.yml
execs:
wsclean: /opt/bin/wsclean
aoflagger: /opt/bin/aoflagger
cluster: calim
3. Environment Setup
# Create user's conda config (~/.condarc)
pkgs_dirs:
- /opt/devel/<username>/cache/conda
envs_dirs:
- /opt/devel/<username>/envs
- /opt/devel/pipeline/envs
channels:
- conda-forge
- defaults
# Create directories
mkdir -p /opt/devel/<username>/cache/conda
mkdir -p /opt/devel/<username>/envs
# Activate shared environment
conda activate /opt/devel/pipeline/envs/py38_orca_nkosogor
4. Verify Setup
# Test RabbitMQ connection
python3 -c "
from orca.celery import app
print(app.control.ping(timeout=2))
"
# Test Redis connection
python3 -c "
import redis
from orca.configmanager import queue_config
r = redis.Redis.from_url(queue_config.result_backend_uri)
print('Redis:', r.ping())
"
Troubleshooting
Workers Not Responding
# Check if workers are running
ps aux | grep celery
# Check if broker is reachable
nc -zv rabbitmq.calim.mcs.pvt 5672
# Check if Redis is reachable
nc -zv 10.41.0.85 6379
# Try starting worker with debug logging
celery -A orca.celery worker --loglevel=DEBUG
Import Errors on Worker Start
# Common issue: casacore library conflict
unset LD_LIBRARY_PATH
# Test imports manually
python3 -c "from orca.celery import app; print('OK')"
Tasks Stuck in Queue
# List queues and message counts
sudo rabbitmqctl list_queues -p <vhost>
# Purge a queue (deletes all pending tasks!)
celery -A orca.celery purge -Q <queue_name>
# Delete a queue entirely
celery -A orca.celery amqp queue.delete <queue_name>
Worker Memory Issues
Workers are configured to restart after 20 tasks (worker_max_tasks_per_child=20 in celery.py) to prevent memory leaks.
# Check memory usage
pdsh -w lwacalim[00-10] "ps aux | grep 'celery.*worker' | awk '{sum+=\$6} END {print sum/1024 \" MB\"}'"
Redis Full
# Check Redis memory
python3 -c "
import redis
r = redis.Redis.from_url('redis://10.41.0.85:6379/0')
info = r.info('memory')
print(f\"Used: {info['used_memory_human']}\")
print(f\"Peak: {info['used_memory_peak_human']}\")
"
# Results expire after 2 hours (result_expires=7200 in celery.py)
# Spectrum cache expires after 10 hours
Quick Reference
Key Files
File |
Purpose |
|---|---|
|
User configuration (broker, redis, telescope) |
|
Celery app definition, queues, routing |
|
Loads configuration |
|
Task definitions |
Key Commands
# Start worker
celery -A orca.celery worker -Q default --hostname=default@$(hostname) -c 40
# Check workers
python3 -c "from orca.celery import app; print(app.control.ping())"
# Stop workers
pkill -15 -f 'celery.*worker'
# Monitor (Flower)
celery -A orca.celery flower --port=5555
# Purge queue
celery -A orca.celery purge -Q <queue>
Service Locations
Service |
Host |
Port |
Config |
|---|---|---|---|
RabbitMQ |
lwacalimhead |
5672 |
|
Redis |
lwacalim10 |
6379 |
|
Flower |
lwacalim10 |
5555 |
Started manually |
Infrastructure Reference
These services are already installed. This section is for reference/recovery only.
RabbitMQ
Docs: https://www.rabbitmq.com/docs
Install: https://www.rabbitmq.com/docs/install-debian
Location: lwacalimhead
Start:
sudo systemctl start rabbitmq-serverConfig:
/etc/rabbitmq/rabbitmq.conf
Redis
Docs: https://redis.io/docs/
Install: https://redis.io/docs/getting-started/installation/install-redis-on-linux/
Location: lwacalim10
Start:
sudo systemctl start redisConfig:
/etc/redis.conf