WhatsApp

WhatsApp 定时消息发送任务调度系统架构:Redis + Celery 方案


WhatsApp 定时消息发送任务调度系统架构:Redis + Celery 方案

痛点描述

在构建 WhatsApp 的定时消息发送功能时,面临多个挑战:

  • 高并发处理:用户可能会在短时间内发送大量定时消息,传统的单线程处理方式无法满足需求。
  • 可靠性:定时任务可能因系统故障而丢失,需要保证任务的持久性和可靠性。
  • 延迟:用户对消息的发送时间有严格要求,延迟过高会影响用户体验。
  • 可扩展性:随着用户数量增长,系统需要能够平滑扩展。

核心逻辑

WhatsApp 定时消息发送任务调度系统架构采用 Redis 作为消息队列,Celery 作为任务调度工具。整体架构如下:

  1. 任务生成:用户在 WhatsApp 中设置定时消息,系统将该任务存储到 Redis。
  2. 任务调度:Celery 定期从 Redis 中获取待处理的任务并执行。
  3. 消息发送:Celery Worker 执行任务,调用 WhatsApp API 发送消息。

工作流程

  1. 用户创建定时消息,系统将其保存到 Redis 数据库。
  2. Celery 定时任务会定期检查 Redis,查找即将到达发送时间的消息。
  3. 当检测到任务到达发送时间,Celery Worker 通过 WhatsApp API 发送消息。
  4. 系统记录任务执行状态,确保消息已成功发送。

Python 代码示例

以下是使用 Python 实现 WhatsApp 定时消息发送的基本框架。

from celery import Celery
import redis
from datetime import datetime, timedelta
import requests

# 初始化 Redis 和 Celery
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
app = Celery('tasks', broker='redis://localhost:6379/0')

# 定义消息发送任务
@app.task
def send_whatsapp_message(phone_number, message):
    url = "https://api.whatsapp.com/send"  # 替换为实际 WhatsApp API 地址
    data = {
        'phone': phone_number,
        'message': message
    }
    response = requests.post(url, json=data)
    return response.json()

# 定时任务添加示例
def schedule_message(phone_number, message, delay_seconds):
    send_time = datetime.now() + timedelta(seconds=delay_seconds)
    redis_client.zadd('scheduled_messages', {f'{phone_number}:{message}': send_time.timestamp()})

# Celery 定时任务
@app.task
def check_scheduled_messages():
    current_time = datetime.now().timestamp()
    messages = redis_client.zrangebyscore('scheduled_messages', 0, current_time)
    
    for msg in messages:
        phone_number, message = msg.decode('utf-8').split(':')
        send_whatsapp_message.delay(phone_number, message)
        redis_client.zrem('scheduled_messages', msg)  # 删除已处理的任务

JS 代码示例

在前端,可以使用 AJAX 向后端 API 请求定时消息的创建。

function scheduleMessage(phoneNumber, message, delaySeconds) {
    const payload = {
        phone_number: phoneNumber,
        message: message,
        delay_seconds: delaySeconds
    };
    
    fetch('/api/schedule_message', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify(payload)
    })
    .then(response => response.json())
    .then(data => console.log('Message scheduled:', data))
    .catch(error => console.error('Error:', error));
}

高级优化建议

  1. 消息去重:在 Redis 中使用唯一标识符防止重复任务,避免多次发送相同消息。
  2. 任务重试机制:在 Celery 中配置重试策略,确保由于临时故障导致的任务失败能够重新尝试。
  3. 监控与报警:集成监控工具(如 Prometheus)和报警机制,实时监测任务执行情况。
  4. 负载均衡:利用多个 Celery Worker 实例平衡负载,提高并发处理能力。
  5. 数据持久化:使用 Redis 的持久化功能(RDB/AOF),确保数据在系统重启后不会丢失。
方案优点缺点
传统定时器简单易用不支持高并发,可靠性差
Redis高性能,支持并发,持久化任务需要额外集成
Celery强大的任务调度,支持重试机制学习曲线陡峭
Redis + Celery结合了两者优点,适用于大规模系统系统复杂度增加

结论

通过采用 WhatsApp 定时消息发送任务调度系统架构:Redis + Celery 方案,您可以有效地解决高并发、可靠性和延迟等问题。该架构不仅具备高性能和可扩展性,还能保证消息的准确发送。

如果您在集成过程中遇到复杂的架构问题,欢迎咨询 apianswer.com 技术团队。

本文由 ApiAnswer 原创。我们在 API 集成、自动化流程和 Telegram Bot 开发领域拥有丰富经验。

遇到技术瓶颈? 获取专家支持