Skip to content

分布式相关

随着业务规模的不断扩大,单体应用已经无法满足高并发、高可用的需求。分布式系统通过将应用拆分到多个节点上运行,能够有效提升系统的性能和可靠性。本章节将介绍分布式系统中的核心技术,包括分布式事务、消息队列、缓存和搜索引擎。

分布式事务

在分布式系统中,事务可能涉及多个服务或数据库,这就带来了分布式事务的挑战。

分布式事务概念

分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

CAP理论

CAP理论指出,一个分布式系统最多只能同时满足以下三个特性中的两个:

  1. 一致性(Consistency):所有节点在同一时间具有相同的数据
  2. 可用性(Availability):保证每个请求不管成功或者失败都有响应
  3. 分区容错性(Partition tolerance):系统中任意信息的丢失或失败不会影响系统的继续运作

在分布式系统中,分区容错性是必须满足的,因此只能在一致性和可用性之间做出权衡。

BASE理论

BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)的缩写,是对CAP理论中AP方案的延伸。

分布式事务解决方案

1. 两阶段提交(2PC)

两阶段提交是最常见的分布式事务解决方案:

java
// 事务管理器
@Component
public class TransactionManager {
    private List<TransactionParticipant> participants = new ArrayList<>();
    
    public void registerParticipant(TransactionParticipant participant) {
        participants.add(participant);
    }
    
    public void beginTransaction() {
        // 第一阶段:准备阶段
        for (TransactionParticipant participant : participants) {
            if (!participant.prepare()) {
                rollback();
                throw new RuntimeException("事务准备失败");
            }
        }
        
        // 第二阶段:提交阶段
        commit();
    }
    
    private void commit() {
        for (TransactionParticipant participant : participants) {
            participant.commit();
        }
    }
    
    private void rollback() {
        for (TransactionParticipant participant : participants) {
            participant.rollback();
        }
    }
}

// 事务参与者接口
public interface TransactionParticipant {
    boolean prepare();
    void commit();
    void rollback();
}

2. TCC(Try-Confirm-Cancel)

TCC是一种柔性事务解决方案:

java
public interface AccountService {
    // Try阶段:检查并预留资源
    boolean tryDeduct(String accountId, BigDecimal amount);
    
    // Confirm阶段:确认执行
    boolean confirmDeduct(String accountId, BigDecimal amount);
    
    // Cancel阶段:取消执行
    boolean cancelDeduct(String accountId, BigDecimal amount);
}

3. Saga模式

Saga模式通过一系列本地事务来实现分布式事务:

java
@Component
public class OrderSagaOrchestrator {
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private PaymentService paymentService;
    
    @Autowired
    private InventoryService inventoryService;
    
    public void createOrder(OrderRequest request) {
        // 创建订单
        Order order = orderService.createOrder(request);
        
        try {
            // 扣减库存
            inventoryService.deductInventory(request.getProductId(), request.getQuantity());
            
            // 执行支付
            paymentService.processPayment(order.getId(), request.getAmount());
            
            // 确认订单
            orderService.confirmOrder(order.getId());
        } catch (Exception e) {
            // 补偿操作
            compensate(order, request);
            throw e;
        }
    }
    
    private void compensate(Order order, OrderRequest request) {
        // 取消支付
        paymentService.cancelPayment(order.getId());
        
        // 恢复库存
        inventoryService.restoreInventory(request.getProductId(), request.getQuantity());
        
        // 取消订单
        orderService.cancelOrder(order.getId());
    }
}

4. Seata框架

Seata是阿里巴巴开源的分布式事务解决方案:

java
@GlobalTransactional
public void purchase(Product product, int quantity) {
    // 扣减库存
    storageService.decrease(product.getId(), quantity);
    
    // 创建订单
    orderService.createOrder(product.getId(), quantity);
    
    // 执行支付
    accountService.decreaseMoney(order.getAmount());
}

消息队列

消息队列是分布式系统中重要的组件,用于解耦应用、异步处理和流量削峰。

消息队列概念

消息队列是一种应用程序间的通信方法,通过在消息队列中存储消息来实现应用程序之间的异步通信。

RabbitMQ

RabbitMQ是实现了AMQP(高级消息队列协议)的开源消息代理软件。

基本概念

  1. Producer(生产者):发送消息的应用程序
  2. Consumer(消费者):接收消息的应用程序
  3. Queue(队列):存储消息的缓冲区
  4. Exchange(交换机):接收生产者发送的消息并将其路由到一个或多个队列
  5. Binding(绑定):交换机和队列之间的关联

四种交换机类型

  1. Direct Exchange(直连交换机):根据Routing Key完全匹配
  2. Fanout Exchange(扇出交换机):将消息广播到所有绑定的队列
  3. Topic Exchange(主题交换机):根据Routing Key模式匹配
  4. Headers Exchange(头交换机):根据消息头属性匹配

Spring Boot集成RabbitMQ

java
// 配置类
@Configuration
@EnableRabbit
public class RabbitConfig {
    
    @Bean
    public Queue orderQueue() {
        return new Queue("order.queue", true);
    }
    
    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange("order.exchange");
    }
    
    @Bean
    public Binding binding(Queue orderQueue, TopicExchange orderExchange) {
        return BindingBuilder.bind(orderQueue).to(orderExchange).with("order.#");
    }
}

// 生产者
@Service
public class OrderProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendOrder(Order order) {
        rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
    }
}

// 消费者
@Component
public class OrderConsumer {
    
    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order) {
        System.out.println("处理订单: " + order.getId());
        // 处理订单逻辑
    }
}

Kafka

Kafka是一个分布式流处理平台,具有高吞吐量、可持久化等特点。

基本概念

  1. Producer(生产者):发布消息到Kafka集群
  2. Consumer(消费者):从Kafka集群读取消息
  3. Broker(代理):Kafka集群中的服务器
  4. Topic(主题):消息的分类
  5. Partition(分区):Topic的分区,用于并行处理
  6. Offset(偏移量):消息在分区中的位置

Spring Boot集成Kafka

java
// 配置
@Configuration
@EnableKafka
public class KafkaConfig {
    
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

// 生产者
@Service
public class KafkaProducer {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

// 消费者
@Component
public class KafkaConsumer {
    
    @KafkaListener(topics = "order-topic")
    public void listen(String message) {
        System.out.println("收到消息: " + message);
        // 处理消息
    }
}

缓存技术

缓存是提高系统性能的重要手段,通过将热点数据存储在高速存储介质中,减少对后端数据库的访问。

Redis

Redis是一个开源的内存数据结构存储系统,支持多种数据结构。

Redis数据结构

  1. String(字符串):最基本的数据类型
  2. Hash(哈希):键值对集合
  3. List(列表):字符串列表
  4. Set(集合):无序不重复元素集合
  5. ZSet(有序集合):有序不重复元素集合

Spring Boot集成Redis

java
// 配置
@Configuration
@EnableCaching
public class RedisConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

// 缓存使用
@Service
public class UserService {
    
    @Autowired
    private UserRepository userRepository;
    
    @Cacheable(value = "users", key = "#id")
    public User getUserById(Long id) {
        return userRepository.findById(id)
                .orElseThrow(() -> new UserNotFoundException("用户不存在"));
    }
    
    @CacheEvict(value = "users", key = "#user.id")
    public User updateUser(User user) {
        return userRepository.save(user);
    }
    
    @CacheEvict(value = "users", key = "#id")
    public void deleteUser(Long id) {
        userRepository.deleteById(id);
    }
}

// 手动操作Redis
@Service
public class CacheService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void set(String key, Object value, long timeout) {
        redisTemplate.opsForValue().set(key, value, timeout, TimeUnit.SECONDS);
    }
    
    public Object get(String key) {
        return redisTemplate.opsForValue().get(key);
    }
    
    public boolean delete(String key) {
        return redisTemplate.delete(key);
    }
}

缓存策略

  1. Cache-Aside Pattern(旁路缓存模式)

    • 读取时先查缓存,缓存未命中再查数据库
    • 更新时先更新数据库,再删除缓存
  2. Read-Through/Write-Through(读写穿透)

    • 缓存层负责与数据库交互
    • 应用程序只与缓存层交互
  3. Write-Behind(异步缓存写入)

    • 先更新缓存,异步批量更新数据库

搜索引擎

搜索引擎用于实现全文检索功能,在电商、内容系统中广泛应用。

Elasticsearch

Elasticsearch是一个基于Lucene的分布式搜索引擎。

核心概念

  1. Index(索引):文档的集合,类似于数据库
  2. Type(类型):文档的分类,类似于表(7.x版本后废弃)
  3. Document(文档):存储的基本单元,类似于行
  4. Field(字段):文档中的数据项,类似于列
  5. Mapping(映射):定义文档字段的类型和属性
  6. Shard(分片):索引的水平分割单元
  7. Replica(副本):分片的备份

Spring Boot集成Elasticsearch

java
// 实体类
@Document(indexName = "products")
public class Product {
    @Id
    private String id;
    
    @Field(type = FieldType.Text, analyzer = "ik_max_word")
    private String name;
    
    @Field(type = FieldType.Text, analyzer = "ik_max_word")
    private String description;
    
    @Field(type = FieldType.Double)
    private Double price;
    
    @Field(type = FieldType.Keyword)
    private String category;
    
    // getter和setter省略
}

// Repository
@Repository
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
    
    List<Product> findByNameContaining(String name);
    
    List<Product> findByCategoryAndPriceBetween(String category, Double minPrice, Double maxPrice);
}

// Service
@Service
public class ProductService {
    
    @Autowired
    private ProductRepository productRepository;
    
    public Product saveProduct(Product product) {
        return productRepository.save(product);
    }
    
    public List<Product> searchProducts(String keyword) {
        return productRepository.findByNameContaining(keyword);
    }
    
    public List<Product> searchByCategoryAndPrice(String category, Double minPrice, Double maxPrice) {
        return productRepository.findByCategoryAndPriceBetween(category, minPrice, maxPrice);
    }
}

实践练习

  1. 实现一个分布式订单系统,使用Seata处理分布式事务
  2. 创建消息驱动的用户通知系统,使用RabbitMQ或Kafka
  3. 构建高性能的商品查询系统,集成Redis缓存
  4. 开发商品搜索功能,使用Elasticsearch实现全文检索

总结

分布式系统是现代互联网应用的基础架构,掌握分布式相关技术对于Java开发者至关重要。通过本章节的学习,你应该掌握了:

  1. 分布式事务的概念和解决方案(2PC、TCC、Saga、Seata)
  2. 消息队列的原理和使用(RabbitMQ、Kafka)
  3. 缓存技术的应用(Redis)和缓存策略
  4. 搜索引擎的使用(Elasticsearch)

在实际项目中,需要根据业务场景选择合适的技术方案,并注意处理分布式系统中的常见问题,如数据一致性、容错性、监控等。