Ray - Distributed Computing Framework
Ray is an AI compute engine for scaling AI and Python applications. It consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
- Step 1
Overview
Ray is an open source distributed computing framework designed for scaling AI and Python applications. Created by the RLAB research group at UC Berkeley, Ray provides a unified API for distributed task scheduling, stateful computation, and data parallelism.
Ray is widely used for:
- Machine learning model training at scale
- Hyperparameter tuning
- Reinforcement learning
- Batch inference
- Serving ML models
- Distributed data processing
Key components of the Ray ecosystem:
- Ray Core: Fundamental primitives (tasks, actors, objects) for distributed applications
- Ray Data: Distributed data processing for ETL and preprocessing
- Ray Train: Distributed training for machine learning models
- Ray Tune: Hyperparameter tuning and experiment management
- Ray Serve: Scalable model serving with autoscaling
- Ray RLlib: Production-ready reinforcement learning library
- Step 2
System Requirements
Ray officially supports:
- Platforms: Linux (x86_64, aarch64/ARM), macOS (Apple Silicon M1+), Windows (beta)
- Python: 3.8, 3.9, 3.10, 3.11, 3.12
- Hardware: Ray runs on single machines and can scale to thousands of nodes across clouds
For ML workloads:
- GPU support via NVIDIA CUDA (recommended for deep learning)
- Optional: TPUs, AMD ROCm
- Sufficient RAM for model and data storage
- High-speed network for cluster deployments (10 GigE recommended)
- Step 3
Installation (ML Applications)
Install Ray from PyPI. For machine learning applications, this installs Ray with Data, Train, Tune, and Serve libraries.
# Full ML stack with dashboard and cluster launcher pip install -U "ray[data,train,tune,serve]" # For reinforcement learning specifically (includes RLlib) pip install -U "ray[rllib]" # Verify installation ray --version - Step 4
Installation (Python Applications)
For general distributed Python applications without ML dependencies.
# Ray with dashboard and cluster launcher pip install -U "ray[default]" # Minimal Ray (no dashboard, no cluster launcher) pip install -U "ray" # Verify installation ray --version - Step 5
Quick Start
Get started with Ray in a few steps. Ray will automatically initialize when you first use a Ray API.
import ray # Initialize Ray (optional - happens automatically on first use) ray.init() # Simple distributed task @ray.remote def add(x, y): return x + y # Submit tasks - returns ObjectRef futures futures = [add.remote(i, i + 1) for i in range(10)] # Get results back results = ray.get(futures) print(results) # Output: [1, 3, 5, 7, 9, 11, 13, 15, 17, 19] # Shutdown Ray ray.shutdown() - Step 6
Ray Actors (Stateful Workers)
Actors are stateful workers that maintain internal state between method calls.
import ray ray.init() # Define an actor class @ray.remote class Counter: def __init__(self): self.value = 0 def increment(self): self.value += 1 return self.value def get_value(self): return self.value # Create actor instance counter = Counter.remote() # Call actor methods result = counter.get_value.remote() print(ray.get(result)) # Output: 0 result = counter.increment.remote() print(ray.get(result)) # Output: 1 result = counter.increment.remote() print(ray.get(result)) # Output: 2 ray.shutdown() - Step 7
Ray Data (Distributed Dataset Processing)
Ray Data provides a scalable Dataset API for distributed data processing. It's ideal for ETL, data preprocessing, and large-scale transformations.
import ray from ray import data ray.init() # Create a dataset from various sources # From a list ds = data.from_items([{"text": "hello"}, {"text": "world"}]) # From JSON files ds = data.read_json("/path/to/json/*.json") # From CSV files ds = data.read_csv("/path/to/csv/*.csv") # Parquet files ds = data.read_parquet("/path/to/parquet/") # Transformations ds = ds.map(lambda x: {"text_upper": x["text"].upper()}) # Batch transformations (more efficient) def add_prefix(batch): batch["prefixed"] = ["item_" + t for t in batch["text"]] return batch ds = ds.map_batches(add_prefix) # Filter ds = ds.filter(lambda x: len(x["text"]) > 3) # Sort ds = ds.sort("text") # Aggregate summary statistics ds.summary() # Show sample ds.show(5) # Materialize as list results = ds.to_pandas() ray.shutdown() - Step 8
Ray Train (Distributed Model Training)
Ray Train enables distributed training for deep learning models. It integrates with PyTorch, TensorFlow, and other frameworks.
import ray from ray import train from ray.train import ScalingConfig import torch from torchvision.models import resnet18 ray.init() # Define training function def train_func(config): model = resnet18(num_classes=10) # Get training configuration from Ray context ctx = train.get_context() distributed_config = ctx.distributed_config # Training loop for epoch in range(10): for batch in dataloader: # Training logic here pass # Save checkpoint and report metrics train.report( {"loss": 0.5, "accuracy": 0.9}, checkpoint=Checkpoint.from_dict({"model": model.state_dict()}) ) # Run distributed training results = train.fit( train_loop_per_worker=train_func, scaling_config=ScalingConfig( num_workers=4, # Number of training workers use_gpu=True # Use GPU if available ) ) print(f"Best checkpoint: {results.checkpoints}") ray.shutdown() - Step 9
Ray Tune (Hyperparameter Tuning)
Ray Tune provides scalable hyperparameter optimization with support for multiple optimizers and early stopping.
import ray from ray import tune from ray.tune.schedulers import ASHAScheduler ray.init() # Define training function with config def train_model(config): for i in range(10): # Simulated training current_metric = config["lr"] * (i + 1) tune.report(mean_loss=current_metric) # Define search space search_space = { "lr": tune.uniform(0.001, 0.01), "batch_size": tune.choice([32, 64, 128]), "num_layers": tune.randint(2, 10) } # Run tuning experiment with new API tuner = tune.Tuner( train_model, param_space=search_space, tune_config=tune.TuneConfig( num_samples=5, metric="mean_loss", mode="min", scheduler=ASHAScheduler( metric="mean_loss", mode="min", max_t=10, grace_period=2, reduction_factor=3 ) ), run_config=ray.train.RunConfig( name="hyperparameter_tuning", storage_path="/tmp/ray_results" ) ) results = tuner.fit() # Get best result best_result = results.get_best_result( metric="mean_loss", mode="min" ) print(f"Best config: {best_result.config}") print(f"Best loss: {best_result.metrics['mean_loss']}") ray.shutdown() - Step 10
Ray Serve (Model Serving)
Ray Serve provides scalable, low-latency model serving with autoscaling, batching, and model versioning.
from ray import serve from transformers import pipeline # Define the service @serve.deployment class TextGenerator: def __init__(self): self.pipeline = pipeline("text-generation", model="gpt2") async def __call__(self, text): result = self.pipeline(text, max_length=50) return {"generated": result[0]["generated_text"]} # Deploy with configuration text_gen = TextGenerator.options( num_replicas=2, # Number of replicas max_concurrent_queries=100 ) # Deploy the service depl = text_gen.deploy() # Run inference response = depl.remote("The weather today") print(ray.get(response)) # Alternative: serve.run() for simple deployment import ray ray.init() serve.run( text_gen, host="0.0.0.0", port=8000 ) - Step 11
Ray RLlib (Reinforcement Learning)
Ray RLlib is a scalable reinforcement learning library supporting RL algorithms like PPO, DQN, A3C, and more.
from ray import tune from ray.rllib.algorithms import ppo ray.init() # Build algorithm config alg_config = ppo.PpoConfig() alg_config = alg_config.environment( env_id="CartPole-v1" # Gym environment ID ) alg_config = alg_config.algorithms( gamma=0.99, lr=0.001, vf_loss_coeff=1, entropy_coeff=0.01 ) alg_config = alg_config.resources( num_workers=4, num_envs_per_worker=4, log_level="WARNING" ) # Build algorithm with config algo = alg_config.build() # Train for 10 iterations for _ in range(10): result = algo.train() print(f"Reward: {result['episode_reward_mean']}") # Save the model checkpoint = algo.save("/tmp/my-checkpoint") print(f"Saved checkpoint at: {checkpoint}") ray.shutdown() - Step 12
Local Cluster Setup
Start a Ray cluster on a single machine for development:
# Start Ray with dashboard ray start --head --port=6379 --dashboard-host=0.0.0.0 # Start worker node ray start --address=<master-node-address>:6379 # Check cluster status ray status # Stop the cluster ray stop # View logs ray logs - Step 13
Cluster Configuration Files
For production deployments, use a YAML configuration file to define your Ray cluster:
# cluster.yaml cluster_name: my-ray-cluster # The minimum and max number of workers to scale between. min_workers: 2 max_workers: 10 # Ray head node settings head_node: MachineType: g4dn.xlarge ImageId: ami-0abcdef123456 DiskSize: 100 # Ray worker node settings (worker types) worker_node_types: - MachineType: g4dn.xlarge ImageId: ami-0abcdef123456 DiskSize: 100 Resources: {} Max_workers: 8 # Setup commands for all nodes setup_commands: - pip install ray[tune,serve,data] --upgrade # Resources available_resources: CPU: 4 GPU: 1 # Head node setup commands head_setup_commands: - pip install torch torchvision # Worker setup commands worker_setup_commands: - pip install torch torchvision - echo "Worker node setup complete" - Step 14
Cloud Deployment
Deploy Ray clusters on AWS, GCP, or Azure using Ray Cluster Launcher:
# Submit cluster on AWS ray up cluster.yaml --no-reuse # Submit on Google Cloud ray up cluster.yaml --provider gcp --no-reuse # Submit on Azure ray up cluster.yaml --provider azure --no-reuse # Tear cluster down ray down cluster.yaml --yes # SSH into the cluster ray attach cluster.yaml # Check cluster status ray status # Sync files to cluster ray sync cluster.yaml /local/path /remote/path - Step 15
Monitoring and Debugging
Ray provides a Web Dashboard for monitoring cluster health. The dashboard is available at http://localhost:8265 by default after running
ray start --head.Access the dashboard:
- Nodes overview
- Job details
- Task execution
- GPU utilization
- Ray Data pipelines
- Serve deployments
# Start Ray with dashboard ray start --head --dashboard-host=0.0.0.0 --dashboard-port=8265 # Open browser to http://localhost:8265 # Check cluster info ray status # Attach to cluster for debugging ray attach cluster.yaml # Sync files and run commands ray sync cluster.yaml /local/path ray exec cluster.yaml -- "pip install my-package" - Step 16
Advanced Configuration
Fine-tune Ray behavior with init options:
import ray # Initialize with custom options ray.init( # Cluster settings address="auto", # or "localhost:6379" # Resource limits num_cpus=4, num_gpus=2, # Memory memory=8e9, # 8GB # Object store object_store_memory=2e9, # 2GB # Dashboard include_dashboard=True, dashboard_host="0.0.0.0", dashboard_port=8265, # Logging log_to_driver=True, ignore_reinit_error=True, # Namespace namespace="default" ) # Access cluster resources resources = ray.available_resources() print(resources) # {'CPU': 4, 'GPU': 2, 'memory': 8000000000} - Step 17
Best Practices and Pitfalls
Best Practices:
- Use batch processing:
map_batches()is more efficient thanmap()for large datasets - Leverage actors for state: Reuse actor instances rather than recreating them
- Configure resources properly: Don't overallocate GPU memory
- Use checkpointing: Save checkpoints during long training runs
- Monitor your cluster: Use the Ray Dashboard to identify bottlenecks
- Batch requests: For inference, batch similar requests to improve throughput
Common Pitfalls:
- ObjectRef confusion: ObjectRefs are not the actual data - use
ray.get()to retrieve - Memory pressure: Large datasets can cause OOM - process in batches
- Actor deadlocks: Avoid holding locks across async calls
- GIL limitations: Python functions still hit the GIL - use native extensions for compute-intensive tasks
- Serialization overhead: Large objects serialization can be slow - use numpy arrays when possible
⚠ Heads up: Always test your distributed code locally first before scaling to a cluster. Use `ray.init()` without arguments for local development. - Use batch processing:
Feature requests
Sign in to suggest features or vote on existing ones.
No feature requests yet.
Discussion
Sign in to join the discussion.
No comments yet.