基于SpringCloudAlibaba设计开发的直播平台项目

26k words

1. 技术栈总结

数据库 缓存 网关 消息队列 注册/配置中心 基础框架 容器技术
MySQL Caffeine Gateway RocketMQ Nacos SpringBoot docker
Redis Dubbo
MybatisPlus
ShardingJDBC
Netty

2. 基础架构搭建

2.1. 微服务架构模式

  1. 代理微服务模式
  2. 聚合器模式
  3. 链式微服务模式(错误)
  4. 分支微服务模式
  5. 数据共享模式
  6. 异步消息传递模式

2.2. 框架对比

2.2.1. Dubbo

核心特点

  • 高性能RPC调用
  • 弹性架构
  • 服务治理
    • 流量分配转发
    • 可视化监控平台
    • 微服务生态完善
    • 开始支持服务网格技术

2.2.2. SpringCloud

核心特点

提供一套通用的分布式系统基础组件,支持服务治理、服务注册与发现,配置中心,负载均衡,监控跟踪等。

2.2.3. SpringCloudAlibaba

致力于提供微服务开发的一站式解决方案。

HTP–>RPC,性能比SpringCloud要高。

2.3. 环境搭建

docker latest
docker-mysql 8.0.27
docker-redis latest
RocketMQ 4.8.0
JDK 1.8.0_212
nacos 2.2.3

一堆坑,但是踩完这些坑对网络连接的认识比起以往深刻多了。

2.3.1. Docker

Docker 教程 | 菜鸟教程 (runoob.com)

Docker overview | Docker Docs

Docker安装MySQL,Redis

Docker 安装 MySQL | 菜鸟教程 (runoob.com)

Docker 安装 Redis | 菜鸟教程 (runoob.com)

redis.conf

#注释掉这部分,这是限制redis只能本地访问
bind 127.0.0.1

#默认yes,开启保护模式,限制为本地访问
protected-mode no

#默认no,改为yes意为以守护进程方式启动,可后台运行,除非kill进程,改为yes会使配置文件方#式启动redis失败
daemonize no

#redis持久化(可选)
appendonly yes

#设置密码
requirepass 123456 

#启动命令
docker run -p 6379:6379 --name redis \
-v /usr/local/redis.conf:/etc/redis/redis.conf \
-v /usr/local/data:/data \
-d redis redis-server /etc/redis/redis.conf \
--appendonly yes

# 启动容器的时候,并为其设置密码
docker run -d --name myredis -p 6379:6379 redis --requirepass "123456"

# 通过容器id,进入redis
docker exec -it CONTAINER_ID /bin/bash

# 运行redis客户端
redis-cli

# 查看redis的密码
config get requirepass

# 设置redis的密码
config set requirepass yourPassword

# 认证
auth yourPassword

2.3.2. JDK1.8

链接:https://pan.baidu.com/s/1XVVQ67HuMpnhM7TN5BjVKg?pwd=sjeo
提取码:sjeo

2.3.3. Maven

在Linux系统中安装并配置maven详细教程_linux安装maven-CSDN博客

2.3.4. RocketMQ

快速开始 | RocketMQ (apache.org)

启动

cd /usr/local/RocketMQ-4.8.0/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/
nohup sh /bin/mqnamesrcv >/dev/null 2>&1 &

2.3.5. Nacos

Nacos 快速开始

需要开放7848、8848、9848、9849端口

2.4. Docker的使用

容器技术的流行与服务的架构演变有关。

2.4.1. Docker的特点

  • 不需要运行额外的OS(省内存)
  • 部署方便快速
  • 资源相互隔离

2.4.2. Docker的常用操作

创建、部署容器
docker run -d -p [对外端口]:[内部端口] [images name]
创建镜像
docker create [images name]
部署容器
docker start [images id]
进入容器 
docker exec -it [container id] /bin/bash
退出容器
exit
停止容器
docker stop [container id]
删除容器
docker rm [container id]
容器信息
docker inspect [container id]
容器日志
docker logs [container id]
容器状态监控
docker stats [container id]
限制容器下载速度
docker update -m -100m --memory-swap -1 [container id]

2.4.3. Docker的底层原理

一文读懂容器三大核心技术——Namespace,Cgroup和UnionFS-CSDN博客

2.4.3.1. NameSpace

Namespaces是一种实现不同进程间资源隔离的机制,不同的Namespaces程序之间的资源相互独立。

2.4.3.2. Cgroups

全称Control Groups,是Linux内核提供的物理资源隔离机制,通过这种机制,可以实现对Linux进程或者进程组的资源限制、隔离和统计功能。


3. 技术选型

3.1. RPC

RPC是什么,看完你就知道了 - 知乎 (zhihu.com)

Rpc和Http的区别 - 知乎 (zhihu.com)

RPC产品

  1. Dubbo

  2. gRPC

  3. brpc

  4. Thrift

3.1.1. Dubbo

resources

spring:
application:
  name:yun-livestreaming-user-provider
cloud:
nacos:
  username:
  password:
discovery:
  server-addr: xx.xx.xx.xx:8848
  namespace: [从nacos里copy的命名空间ID]
dubbo.application.name=yun-livestreaming-user-provider
dubbo.registry.address=nacos://localhost:8848?namespace=[从nacos里copy的命名空间ID]
dubbo.server=true
dubbo.protocol.name=dubbo
dubbo.protocol.port=9090

启动类

package org.yun.livestreaming.user.provider;

import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDubbo
@EnableDiscoveryClient
public class UserProviderApplication {
    public static void main(String[] args) {
        SpringApplication springApplication = new SpringApplication(UserProviderApplication.class);
        springApplication.setWebApplicationType(WebApplicationType.NONE);
        springApplication.run(args);
    }
}

dubbo:reference | Apache Dubbo

高级特性和用法 | Apache Dubbo

(3)Dubbo启动时qos-server can not bind localhost:22222错误解决_qos-server can not bind localhost:22222, dubbo ver-CSDN博客

3.1.1.1. Dubbo的底层原理

服务暴露原理

对export函数的源代码进行深入研究,一直到NettyServer的启动。

服务调用原理

通过ReferenceConfig的get函数返回一个代理对象,然后在代理对象中发送请求,最后抵达DubboInvoker中,将请求放入一条队列中,再由一个异步线程去消费这条队列的数据进行发送消息。

3.2. 架构演变下的技术变化

3.2.1. MySQL的架构变化

  1. 单机

    • 承载能力不足
  2. 主从

    • 主从延迟

3.2.2. Redis的架构变化

  1. 单机

    • 容量有限,单点风险
  2. 哨兵/主从

    • 容量有限
  3. 集群

    • 机器成本高,存在指令重定向可能

3.3. 用户中台

多个接入方,共同使用一个用户中台体系。

用户中台业务特点

  • 数据源、代码的统一管理
  • 方便不同业务线的接入
  • 高性能、高并发、高可用的服务
  • 维护成本较高

MySQL读压力如何解决?

读压力分担给从节点,从库做横向扩容。

MySQL主从架构下有什么问题?

主从延迟。

Redis存储空间不足

传统的主从、哨兵两种架构都是单点存储,如果存储数据过多,可以考虑使用分片集群架构。

Redis分片集群下有什么问题?

片键重定向,部分指令失效。

流量增加的处理手段

面对流量的增加,核心的处理手段就是横向扩容,所以在进行架构设计的时候需要考虑到这一点。

4. 用户服务中台

用户数据存储分析

  1. 设计的时候尽量做到冷热字段分离;
  2. 分析哪些属性是唯一的,哪些属性是一对多的;
  3. 用户数据量预估;
  4. 存储选型:一般用关系型数据库(结构化关系明显;技术成本相对较低)

4.1. MySQL的分库分表

分库+分表的特点

分表分散在不同的数据库中

  • 数据库连接充足
  • 不能做联表查询
  • 跨数据库的事务操作完成不了

如何选择

连接数充足:分表

连接数不足:分库分表

4.1.1. ShardingJDBC

一款轻量级的Java框架,对原先的JDBC框架做了封装,可以兼容各类常用的ORM框架。

路由

将原先只访问单表的SQL,改写为可以访问分库分表模式下的SQL。

  1. 直接路由
  2. 标准路由
  3. 笛卡尔积路由
  4. 全库路由
  5. 全实例路由
  6. 全库表路由
  7. 单播路由
  8. 阻断路由

使用ShardingJDBC后,尽量使用简单查询类型的SQL,少用分组查询聚合函数。对于分页查询要谨慎使用,避免产生全表扫描的情况。

4.1.1.1 ShardingJDBC的实现

ShardingJDBC的配置

yun-livestreaming-user-provider/pom.xml
        <mybatis-plus.version>3.5.3</mybatis-plus.version>
        <sharding.jdbc.version>5.3.2</sharding.jdbc.version>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${yun-mysql.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>shardingsphere-jdbc-core</artifactId>
            <version>${sharding.jdbc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>
        <dependency>
yun-livestreaming-user-provider/src/main/resources/yun-db-sharding.yaml

dataSources:
  # 新表,重建的分表
  user_master:
    dataSourceClassName: com.zaxxer.hikari.HikariDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://host:8808/yun_livestreaming_user?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
    username: root
    password: admin
  # 新表,重建的分表
  user_slave0:
    dataSourceClassName: com.zaxxer.hikari.HikariDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://host:8809/yun_livestreaming_user?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
    username: root
    password: admin

# 读写分离,写的时候往master写,读的时候在slave读
rules:
  - !READWRITE_SPLITTING
    dataSources:
      user_ds:
        staticStrategy:
          writeDataSourceName: user_master
          readDataSourceNames:
            - user_slave0
  - !SINGLE
    defaultDataSource: user_ds  # 不分表分分库的默认数据源
  - !SHARDING
    tables:
      t_user:
        actualDataNodes: user_ds.t_user_${(0..99).collect(){ it.toString().padLeft(2,'0') } }
        tableStrategy:
          standard:
            shardingColumn: user_id
            shardingAlgorithmName: t_user-inline
    shardingAlgorithms:
      t_user-inline:
        type: INLINE
        props:
          algorithm-expression: t_user_${(user_id %100).toString().padLeft(2,'0')}
props:
  sql-show: true
yun-livestreaming-user-provider/src/main/resources/application.yml

spring:
  application:
    name: yun-livestreaming-user-provider
  datasource:
    driver-class-name: org.apache.shardingsphere.driver.ShardingSphereDriver
    url: jdbc:shardingsphere:classpath:yun-db-sharding.yaml
    hikari:
      pool-name: yun-user-pool
      minimum-idle: 150
      maximum-pool-size: 300
      idle-timeout: 60000
      connection-timeout: 4000
      max-lifetime: 60000


dubbo:
  application:
    name: ${spring.application.name}
  registry:
    address: nacos://host:8848?namespace=610d99d1-3f09-4bdd-adb2-a09368e0d59a
  protocol:
    name: dubbo
    port: 9091
yun-livestreaming-user-provider/src/main/resources/bootstrap.yml

spring:
  application:
    name: yun-livestreaming-user-provider
  cloud:
    nacos:
      discovery:
        server-addr: :8848
        namespace: 610d99d1-3f09-4bdd-adb2-a09368e0d59a

实现单个增改查的前置操作

yun-livestreaming-user-provider/src/main/java/org/yun/livestreaming/user/provider/dao/po/UserPO.java

//创建属性。
yun-livestreaming-user-interface/src/main/java/org/yun/livestreaming/user/dto/UserDTO.java

public class UserDTO implements Serializable {

    @Serial
    private static final long serialVersionUID = 4540721678683662252L;
    private Long userId;
    private String nickName;
    private String trueName;
    private String avatar;
    private Integer sex;
    private Integer workCity;
    private Integer bornCity;
    private Date bornDate;
    private Date createTime;
    private Date updateTime;

    构造器......
yun-livestreaming-common-interface/src/main/java/org/yun/livestreaming/common/interfaces/ConvertBeanUtils.java
    

/**
 * @Author YunSheng
 * @Create 2023/11/10 9:59
 * @Description Bean工具类
 */
public class ConvertBeanUtils {
    /**
     * 将一个对象转成目标对象
     *
     * @param source
     * @param targetClass
     * @param <T>
     * @return
     */
    public static <T> T convert(Object source, Class<T> targetClass) {
        if (source == null) {
            return null;
        }
        T t = newInstance(targetClass);
        BeanUtils.copyProperties(source, t);
        return t;
    }

    /**
     * 将List对象转换成目标对象,注意实现是ArrayList
     *
     * @param targetClass
     * @param <K>
     * @param <T>
     * @return
     */
    public static <K, T> List<T> convertList(List<K> sourceList, Class<T> targetClass) {
        if (sourceList == null) {
            return null;
        }
        List targetList = new ArrayList((int) (sourceList.size() / 0.75) + 1);
        for (K source : sourceList) {
            targetList.add(convert(source, targetClass));
        }
        return targetList;
    }

    private static <T> T newInstance(Class<T> targetClass) {
        try {
            return targetClass.newInstance();
        } catch (Exception e) {
            throw new BeanInstantiationException(targetClass, "instantiation error", e);
        }
    }
}
yun-livestreaming-user-provider/pom.xml

        <dependency>
            <groupId>org.idea</groupId>
            <artifactId>yun-livestreaming-common-interface</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

实现增改查

yun-livestreaming-user-interface/src/main/java/org/yun/livestreaming/user/interfaces/IUserRpc.java
    
public interface IUserRpc {
    /**
     * @Author YunSheng
     * @Create 2023/11/10 10:36
     * @Description 根据用户ID进行查询
     */
    UserDTO getByUserID(Long userId);

    /**
     * @Author YunSheng
     * @Create 2023/11/10 10:56
     * @Description 用户信息更新
     */
    boolean updateUserInfo(UserDTO userDTO);

    /**
     * @Author YunSheng
     * @Create 2023/11/10 11:02
     * @Description 插入用户信息
     */
    boolean insertOne(UserDTO userDTO);
}
yun-livestreaming-user-provider/src/main/java/org/yun/livestreaming/user/provider/service/IUserService.java
    
/**
 * @Author YunSheng
 * @Create 2023/11/10 9:23
 */
public interface IUserService {
    /**
     * @Author YunSheng
     * @Create 2023/11/10 10:36
     * @Description 根据用户ID进行查询
     */
    UserDTO getByUserID(Long userId);

    /**
     * @Author YunSheng
     * @Create 2023/11/10 10:56
     * @Description 用户信息更新
     */
    boolean updateUserInfo(UserDTO userDTO);

    /**
     * @Author YunSheng
     * @Create 2023/11/10 11:02
     * @Description 插入用户信息
     */
    boolean insertOne(UserDTO userDTO);
}
yun-livestreaming-user-provider/src/main/java/org/yun/livestreaming/user/provider/service/impl/UserServiceImpl.java
    
/**
 * @Author YunSheng
 * @Create 2023/11/10 9:24
 */
@Service
public class UserServiceImpl implements IUserService {
    @Resource
    private IUserMapper userMapper;

    @Override
    public UserDTO getByUserID(Long userId) {
        if (userId == null) {
            return null;
        }
        return ConvertBeanUtils.convert(userMapper.selectById(userId), UserDTO.class);
    }

    @Override
    public boolean updateUserInfo(UserDTO userDTO) {
        if (userDTO == null || userDTO.getUserId() == null) {
            return false;
        }
        userMapper.updateById(ConvertBeanUtils.convert(userDTO, UserPO.class));
        return true;
    }

    @Override
    public boolean insertOne(UserDTO userDTO) {
        if (userDTO == null || userDTO.getUserId() == null) {
            return false;
        }
        userMapper.insert(ConvertBeanUtils.convert(userDTO, UserPO.class));
        return true;
    }
}
yun-livestreaming-user-provider/src/main/java/org/yun/livestreaming/user/provider/rpc/UserRpcImpl.java

/**
 * @Author YunSheng
 * @Create 2023/11/9 8:59
 */
@DubboService
public class UserRpcImpl implements IUserRpc {
    @Resource
    private IUserService userService;

    @Override
    public UserDTO getByUserID(Long userId) {
        return userService.getByUserID(userId);
    }

    @Override
    public boolean updateUserInfo(UserDTO userDTO) {
        return userService.updateUserInfo(userDTO);
    }

    @Override
    public boolean insertOne(UserDTO userDTO) {
        return userService.insertOne(userDTO);
    }
}
yun-livestreaming-api/src/main/java/org/yun/livestreaming/api/controller/UserController.java

/**
 * @Author YunSheng
 * @Create 2023/11/9 9:44
 */

@RestController
@RequestMapping("/user")
public class UserController {
    @DubboReference
    private IUserRpc userRpc;

    @GetMapping("/getUserInfo")
    public UserDTO getUserInfo(Long userId) {
        return userRpc.getByUserID(userId);
    }

    @GetMapping("/updateUserInfo")
    public boolean updateUserInfo (Long userId, String nickname) {
        UserDTO userDTO = new UserDTO();
        userDTO.setUserId(userId);
        userDTO.setNickName(nickname);
        return userRpc.updateUserInfo(userDTO);
    }

    @GetMapping("/insertOne")
    public boolean insertOne(Long userId) {
        UserDTO userDTO = new UserDTO();
        userDTO.setUserId(userId);
        userDTO.setNickName("test");
        return userRpc.insertOne(userDTO);
    }

}

4.1.2. 主从架构下读写分离

数据库

CREATE DATABASE yun_livestreaming_user CHARACTER 
SET utf8mb3 COLLATE = utf8_bin;

DELIMITER $$
CREATE PROCEDURE yun_livestreaming_user.create_t_user_100 () BEGIN
    DECLARE
        i INT;
    DECLARE
        table_name VARCHAR ( 30 );
    DECLARE
        table_pre VARCHAR ( 30 );
    DECLARE
        sql_text VARCHAR ( 3000 );
    DECLARE
        table_body VARCHAR ( 2000 );
    
    SET i = 0;
    
    SET table_name = '';
    
    SET sql_text = '';
    
    SET table_body = '(
    user_id bigint NOT NULL DEFAULT -1 COMMENT \'用户 id\',
    nick_name varchar(35) DEFAULT NULL COMMENT \'昵称\',
    avatar varchar(255) DEFAULT NULL COMMENT \'头像\',
    true_name varchar(20) DEFAULT NULL COMMENT \'真实姓名\',
    sex tinyint(1) DEFAULT NULL COMMENT \'性别 0 男,1 女\',
    born_date datetime DEFAULT NULL COMMENT \'出生时间\',
    work_city int(9) DEFAULT NULL COMMENT \'工作地\',
    born_city int(9) DEFAULT NULL COMMENT \'出生地\',
    create_time datetime DEFAULT CURRENT_TIMESTAMP,
    update_time datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE 
    CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb3 
    COLLATE=utf8_bin;';
    WHILE
            i < 100 DO
        IF
            i < 10 THEN
                
                SET table_name = CONCAT( 't_user_0', i );
            ELSE 
                SET table_name = CONCAT( 't_user_', i );
            
        END IF;
        
        SET sql_text = CONCAT( 'CREATE TABLE ', table_name, table_body );
        SELECT
            sql_text;
        
        SET @sql_text = sql_text;
        PREPARE stmt 
        FROM
            @sql_text;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;
        
        SET i = i + 1;
        
    END WHILE;

    END$$
DELIMITER;

本地远程连接云服务器上Docker下的MySQL | 云升的小窝 (blog-yunsheng.cn)

在Windows宿主机中连接虚拟机中的Docker容器总结_虚拟容器内应用怎么访问-CSDN博客

MySQL主从架构配置

linux

# 创建主从数据库文件夹
mkdir -p /usr/local/mysql/master1/conf
mkdir -p /usr/local/mysql/master1/data
mkdir -p /usr/local/mysql/slave1/conf
mkdir -p /usr/local/mysql/slave1/data
# 初始化主数据库配置文件
cd /usr/local/mysql/master1/conf
vi my.cnf
# 粘贴以下内容
[mysqld]
datadir = /usr/local/mysql/master1/data
character-set-server = utf8
lower-case-table-names = 1
# 主从复制-主机配置# 主服务器唯一 ID
server-id = 1
# 启用二进制日志
log-bin=mysql-bin
# 设置 logbin 格式
binlog_format = STATEMENT
# 初始化从数据库配置文件
cd /usr/local/mysql/slave1/conf
vi my.cnf
# 粘贴以下内容
[mysqld]
datadir = /usr/local/mysql/slave1/data
character-set-server = utf8
lower-case-table-names = 1
# 主从复制-从机配置# 从服务器唯一 ID
server-id = 2
# 启用中继日志
relay-log = mysql-relay
# 文件夹授权
chmod -R 777 /usr/local/mysql
linux-docker

docker run --name=mysql-master-1 \
--privileged=true \
-p 8808:3306 \
-v /usr/local/mysql/master1/data/:/var/lib/mysql \
-v /usr/local/mysql/master1/conf/my.cnf:/etc/mysql/my.cnf \
-v /usr/local/mysql/master1/mysql-files/:/var/lib/mysql-files/ \
-e MYSQL_ROOT_PASSWORD=admin \
-d mysql:8.0.27 --lower_case_table_names=1

docker run --name=mysql-slave-1 \
--privileged=true \
-p 8809:3306 \
-v /usr/local/mysql/slave1/data/:/var/lib/mysql \
-v /usr/local/mysql/slave1/conf/my.cnf:/etc/mysql/my.cnf \
-v /usr/local/mysql/slave1/mysql-files/:/var/lib/mysql-files/ \
-e MYSQL_ROOT_PASSWORD=admin \
-d mysql:8.0.27 --lower_case_table_names=1

docker exec -it mysql-master-1 /bin/bash
- 主数据库创建用户 slave 并授权

# 创建用户,设置主从同步的账户名
create user 'yun-slave'@'%' identified with mysql_native_password by 'yun-pwd';
# 授权
grant replication slave on *.* to 'yun-slave'@'%';
# 刷新权限
flush privileges;
# 查询 server_id 值
show variables like 'server_id';
# 也可临时(重启后失效)指定 server_id 的值(主从数据库的 server_id 不能相同)
set global server_id = 1;
# 查询 Master 状态,并记录 File 和 Position 的值,这两个值用于和下边的从数据库中的 change 那条 sql 中的master_log_file,master_log_pos 参数对齐使用
show master status;
# 重置下 master 的 binlog 位点
reset master;
- 进入从数据库

# 注意:执行完此步骤后退出主数据库,防止再次操作导致 File 和 Position 的值发生变化
# 验证 slave 用户是否可用

# 查询 server_id 值
show variables like 'server_id';

# 也可临时(重启后失效)指定 server_id 的值(主从数据库的 server_id 不能相同)
set global server_id = 2;

# 若之前设置过同步,请先重置
stop slave;
reset slave;
# 设置主数据库
change master to master_host='localhost',master_port=8808,master_user='yun-slave',master_password='yun-pwd',master_log_file='binlog.000001',master_log_pos=156;
# 开始同步
start slave;
# 若出现错误,则停止同步,重置后再次启动
stop slave;
reset slave;
start slave;
# 查询 Slave 状态
show slave status;

服务器启动后自动初始化数据库

yun-livestreaming-framework/yun-livestreaming-framework-datasource-starter/src/main/java/org/idea/yun/livestreaming/framework/datasource/starter/config/ShardingJdbcDatasourceAutoInitConnectionConfig.java
    
package org.idea.yun.livestreaming.framework.datasource.starter.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.sql.Connection;


/**
 * @Author YunSheng
 * @Create 2023/11/13 11:04
 * @Description
 */
@Configuration
public class ShardingJdbcDatasourceAutoInitConnectionConfig {

    private static final Logger LOGGER =
            LoggerFactory.getLogger(ShardingJdbcDatasourceAutoInitConnectionConfig.class);

    @Bean
    public ApplicationRunner runner(DataSource dataSource) {
        return args -> {
            LOGGER.info(" ================== [ShardingJdbcDatasourceAutoInitConnectionConfig]dataSource:{} ", dataSource);
            //手动触发下连接池的连接创建
            Connection connection = dataSource.getConnection();
        };
    }
}

4.1.3. Redis

降低对MySQL数据库的访问压力,提升系统查询性能。

4.1.3.1. Redis的实现

单个查询引入Redis提升查询速率

yun-livestreaming-framework/yun-livestreaming-framework-redis-starter/src/main/java/org/idea/yun/livestreaming/framework/redis/starter/config/RedisConfig.java

/**
 * @Author YunSheng
 * @Create 2023/11/13 13:05
 * @Description
 */
@Configuration
@ConditionalOnClass(RedisTemplate.class)
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        IGenericJackson2JsonRedisSerializer valueSerializer = new IGenericJackson2JsonRedisSerializer();
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(valueSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setHashValueSerializer(valueSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}
yun-livestreaming-framework/yun-livestreaming-framework-redis-starter/src/main/java/org/idea/yun/livestreaming/framework/redis/starter/config/MapperFactory.java

import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.springframework.cache.support.NullValue;
import org.springframework.util.StringUtils;

import java.io.IOException;

/**
 * @Author YunSheng
 * @Create 2023/11/13 13:12
 * @Description
 */

public class MapperFactory {

    public static ObjectMapper newInstance() {
        return initMapper(new ObjectMapper(), (String) null);
    }

    private static ObjectMapper initMapper(ObjectMapper mapper, String classPropertyTypeName) {
        mapper.registerModule(new SimpleModule().addSerializer(new MapperNullValueSerializer(classPropertyTypeName)));
        if (StringUtils.hasText(classPropertyTypeName)) {
            mapper.enableDefaultTypingAsProperty(DefaultTyping.NON_FINAL, classPropertyTypeName);
        } else {
            mapper.enableDefaultTyping(DefaultTyping.NON_FINAL, As.PROPERTY);
        }
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        return mapper;
    }


    /**
     * {@link StdSerializer} adding class information required by
     * default typing. This allows de-/serialization of
     * {@link NullValue}.
     *
     * @author Christoph Strobl
     * @since 1.8
     */
    private static class MapperNullValueSerializer extends StdSerializer<NullValue> {
        private static final long serialVersionUID = 1999052150548658808L;
        private final String classIdentifier;

        /**
         * @param classIdentifier can be {@literal null} and will
         *                        be defaulted to {@code @class}.
         */
        MapperNullValueSerializer(String classIdentifier) {
            super(NullValue.class);
            this.classIdentifier = StringUtils.hasText(classIdentifier) ? classIdentifier : "@class";
        }

        /*
        * (non-Javadoc)
        * @see
       com.fasterxml.jackson.databind.ser.std.StdSerializer#serialize(jav
       a.lang.Object, com.fasterxml.jackson.core.JsonGenerator,
       com.fasterxml.jackson.databind.SerializerProvider)
        */
        @Override
        public void serialize(NullValue value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
            jgen.writeStartObject();
            jgen.writeStringField(classIdentifier, NullValue.class.getName());
            jgen.writeEndObject();
        }
    }
}
yun-livestreaming-framework/yun-livestreaming-framework-redis-starter/src/main/java/org/idea/yun/livestreaming/framework/redis/starter/config/IGenericJackson2JsonRedisSerializer.java

import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;

/**
 * @Author YunSheng
 * @Create 2023/11/13 13:14
 * @Description
 */
public class IGenericJackson2JsonRedisSerializer extends GenericJackson2JsonRedisSerializer {
    public IGenericJackson2JsonRedisSerializer() {
        super(MapperFactory.newInstance());
    }

    @Override
    public byte[] serialize(Object source) throws SerializationException {
        if (source != null && ((source instanceof String) || (source instanceof Character))) {
            return source.toString().getBytes();
        }
        return super.serialize(source);
    }
}
yun-livestreaming-framework/yun-livestreaming-framework-redis-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

org.idea.yun.livestreaming.framework.redis.starter.config.RedisConfig
org.idea.yun.livestreaming.framework.redis.starter.key.UserProviderCacheKeyBuilder
yun-livestreaming-user-provider/src/main/java/org/yun/livestreaming/user/provider/service/impl/UserServiceImpl.java

    @Resource
    private RedisTemplate<String, UserDTO> redisTemplate;

    @Resource
    private UserProviderCacheKeyBuilder userProviderCacheKeyBuilder;

    @Override
    public UserDTO getByUserID(Long userId) {
        if (userId == null) {
            return null;
        }
        String key = userProviderCacheKeyBuilder.buildUserInfoKey(userId);
        UserDTO userDTO = redisTemplate.opsForValue().get(key);
        if (userDTO != null) {
            return userDTO;
        }
        userDTO = ConvertBeanUtils.convert(userMapper.selectById(userId), UserDTO.class);
        if (userDTO != null) {
            redisTemplate.opsForValue().set(key, userDTO);
        }
        return userDTO;
    }
spring:  
  data:
    redis:
      port: 6379
      host: 
      lettuce:
        pool:
          min-idle: 10
          max-active: 50
          max-idle: 20
      password: 

批量查询的实现

yun-livestreaming-user-provider/src/main/java/org/yun/livestreaming/user/provider/service/impl/UserServiceImpl.java

    @Override
    public Map<Long, UserDTO> batchQueryUserInfo(List<Long> userIdList) {
        if (CollectionUtils.isEmpty(userIdList)) {
            return Maps.newHashMap();
        }
        userIdList = userIdList.stream().filter(id -> id > 10000).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(userIdList)) {
            return Maps.newHashMap();
        }

        // redis
        List<String> keyList = new ArrayList<>();
        userIdList.forEach(userId -> {
            keyList.add(userProviderCacheKeyBuilder.buildUserInfoKey(userId));
        });
        List<UserDTO> userDTOList = redisTemplate.opsForValue().multiGet(keyList).stream().filter(x -> x != null).collect(Collectors.toList());
        if (!CollectionUtils.isEmpty(userDTOList) && userDTOList.size() == userIdList.size()) {
            return userDTOList.stream().collect(Collectors.toMap(UserDTO::getUserId, x -> x));
        }
        List<Long> userIdInCacheList = userDTOList.stream().map(UserDTO::getUserId).collect(Collectors.toList());
        List<Long> userIdNotInCacheList = userIdList.stream().filter(x -> !userIdInCacheList.contains(x)).collect(Collectors.toList());

        // 多线程查询
        Map<Long, List<Long>> userIdMap = userIdNotInCacheList.stream().collect(Collectors.groupingBy(userId -> userId % 100));
        List<UserDTO> dbQueryResult = new CopyOnWriteArrayList<>();
        userIdMap.values().parallelStream().forEach(queryUserIdList -> {
            dbQueryResult.addAll(ConvertBeanUtils.convertList(userMapper.selectBatchIds(queryUserIdList), UserDTO.class));
        });
        if (!CollectionUtils.isEmpty(dbQueryResult)) {
            Map<String, UserDTO> saveCacheMap = dbQueryResult.stream().collect(Collectors.toMap(userDTO -> userProviderCacheKeyBuilder.buildUserInfoKey(userDTO.getUserId()), x -> x));
            redisTemplate.opsForValue().multiSet(saveCacheMap);
            userDTOList.addAll(dbQueryResult);
        }
        return userDTOList.stream().collect(Collectors.toMap(UserDTO::getUserId, x -> x));
    }

测试成功

image-20231114113038787

4.1.3.2. Redis过期时间设置

回顾Redis可能出现的一些问题:计算机开发综合知识总结(偏Java方向)

// 以管道的形式(网络IO开销较少)实现过期时间
redisTemplate.executePipelined(new SessionCallback<Object>() {
    @Override
    public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
        for (String redisKey : saveCacheMap.keySet()) {
            operations.expire((K) redisKey, createRandomExpireTime(), TimeUnit.SECONDS);
        }
        return null;
    }
});

// 单个查询中的设置
redisTemplate.opsForValue().set(key, userDTO, 30, TimeUnit.MINUTES);

4.1.3.3. 高并发场景下缓存和数据库一致性问题

理解先MySQL与先Redis为什么都有可能会出现数据不一致的问题。原因是多个进程之间的读写不同步问题。

在CSDN看到了一篇很好的讲解文章:为什么需要分布式ID?_为什么要使用分布式id_南大白的博客-CSDN博客

解决方法

不推荐用加锁解决,会导致性能糟糕。

不推荐写时加载缓存,数据一旦写入混乱,得依靠下一次的写操作去恢复。

  • 读时加载缓存:更新MySQL后,删除掉缓存。等到读的时候再同步到缓存。

在更新和删除缓存之间另一进程也有可能从Redis读取到脏数据,但是概率较低,具体看业务需求是否容忍。

最终一致性

性能和一致性不可兼得,引入缓存的目的都是为了提升效率,最终都会妥协到最终一致性来解决。

例如:

  • 利用数据库日志订阅+消息队列保证数据更新
  • 通过延时双删来解决主从同步延时

理解!理解!理解!背是背不下来滴,多想想吧,不难理解但是怕忘了,温故而知新。

4.1.4. 分布式ID

场景分析

用户注册后,需要分配一个userId,其需要具备的特性有:

  1. 唯一标识
  2. 无规律,随机性强(防止窃听,获知用户增量等信息)

4.1.4.1. UUID机制

算法的核心思想是结合机器的网卡、当地时间、一个随机数来生成UUID。

  • 优点:本地生成,生成简单,性能好,没有高可用风险;
  • 缺点:长度过长,无序不可读,对于MySQL的查询不方便,且容易造成叶子节点裂变。

4.1.4.2. Redis自增ID

Redis的所有命令操作都是单线程的,本身提供像incr和increby这样的自增原子命令,所以能保证生成的ID肯定是唯一有序的。

  • 优点:不依赖于数据库,灵活方便,且性能优于数据库,数字ID天然排序,对分页或者需要排序的结果很有帮助;
  • 缺点:存放在Redis当中,如果内存淘汰策略选择不当,会导致丢失,不支持非连续性ID生成。

4.1.4.3. 数据库自增ID

使用数据库的ID自增策略,如MySQL的auto_increment。

并且可用使用两台数据库分表设置不同步长,生成不重复ID的策略来实现高可用。

  • 优点:数据库生成的ID绝对有序,高可用实现方式简单;
  • 缺点:需要独立部署数据库实例,成本高,有性能瓶颈。

4.1.4.4. 雪花算法

Twitter利用Zookeeper实现了一个全局ID生成的服务Snowflake,不同的位代表了不同的含义。

  • 优点:高性能,低延迟,按时间有序,一般不会造成ID碰撞;
  • 缺点:需要独立的开发和部署,依赖于机器的时钟。

4.2. 用户标签

4.2.1. 常见的用户标签实现思路

  1. 基于MongoDB做标签记录

    • 使用JSON格式去做标签记录的存放
    • 需要额外搭建MongoDB集群
  2. 基于MySQL做标签记录

    • 使用二进制的思路去存储标签信息(本项目采用方法)

    • 当业务增加标签类型的时候,不需要对表结构进行调整

4.2.1.1. 标签记录的实现原理

设置标签

  0 0 0 0 1(用户原先就有记录一个标签)
| 1 0 0 0 0
------------------------------
  1 0 0 0 1(用户记录上标签后,存入数据库)

取消标签

  1 0 0 0 1 (用户原先就有记录两个标签)
~ 1 0 0 0 0
------------------------------
  0 1 1 1 1 
& 1 0 0 0 1
------------------------------
  0 0 0 0 1 (取消了用户的数字为 16 的那个标签)

注:关于二进制运算基础在计算机开发综合知识总结(偏Java方向)中有详细讲解。

Comments