ElasticSearch第四章(数据同步和集群)

news/2024/7/7 5:59:57 标签: elasticsearch, 大数据, 搜索引擎

目录

1:ES数据同步

1.1:为什么进行数据同步

1.2:数据同步到RocketMQ

2:ES集群

2.1:三台服务器搭建集群

2.2:ES的集群节点角色

2.2:ES集群服务的搭建


1:ES数据同步

1.1:为什么进行数据同步

因为ES中的是数据来自业务数据也就是数据库,比如mysql等,那么就需要把数据库的数据同步到ES中,那么什么时候进行数据同步呢?

1:直接同步:我们可以在进行mysql的crud的时候,直接把数据插入ES,缺点是增加操作时间,并且跟业务耦合,增加耗时

2:间接同步:我们可以在进行mysql的crud的时候,把数据发送到消息中间件,比如MQ,减少业务耦合,增加相应时间,削峰填谷

1.2:数据同步到RocketMQ

1:pom.xml导入jar,这里是springboot3.X、ES8.X、RocketMQ5.X 版本

 <!--springboot帮我们自动集成了es,所以只需要引入下面这一个依赖即可-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.uzaygezen</groupId>
            <artifactId>uzaygezen-core</artifactId>
            <version>0.2</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.google.uzaygezen</groupId>
            <artifactId>uzaygezen-core</artifactId>
            <version>0.2</version>
            <scope>compile</scope>
        </dependency>

        <!--springboot3.X   rocketmq-spring整合-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>

2:解决RockeMQ5.X和SpringBoot3.X的兼容想问题

地址:Spring boot 3.0整合RocketMQ及不兼容的问题_rocketmq springboot3-CSDN博客

3:配置RocketMQConfig

@Configuration
public class RocketMQConfig {

    @Value("${rocketmq.producer.group}")
    private String producerGroup;

    @Value("${rocketmq.name-server}")
    private String nameServer;

    /**
     * 由于使用的Spring版本是3.0.0以上,与rocketMq不是很兼容,对于rocketMqTemplate
     * 的自动注入存在差异,如果不采用这种方式注入则会报出缺少bean的信息
     */
    @Bean
    public RocketMQTemplate rocketMQTemplate(){
        System.out.println("producerGroup:"+producerGroup);
        System.out.println("nameServer:"+nameServer);
        RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
        //设置默认的DefaultMQProducer
        DefaultMQProducer defaultMqProducer = new DefaultMQProducer();
        defaultMqProducer.setProducerGroup(producerGroup);
        defaultMqProducer.setNamesrvAddr(nameServer);

        rocketMQTemplate.setProducer(defaultMqProducer);
        return rocketMQTemplate;
    }

}

4:配置springboot的yml

#rocketmq的配置
rocketmq:
  name-server: localhost:9876
  producer:
    group: producer_group1
  consumer:
    topic: topic26

5:消息生产者发送消息

 @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.consumer.topic}")
    private String topic;

    @Value(value = "${rocketmq.consumer.topic1}:${rocketmq.consumer.topic2}")
    private String syncTag;
    //发现消息到RocketMQ
    @Override
    public void sendMQ(TbHotel tbHotel) throws JsonProcessingException {

        //发送json
        ObjectMapper objectMapper=new JsonMapper();
        String json = objectMapper.writeValueAsString(tbHotel);
        System.out.println("发送数据:"+json);

        rocketMQTemplate.convertAndSend(topic,json);
    }

6:消息接受者接收消息 

@Service
@RocketMQMessageListener(
        topic = "topic26",
        consumerGroup = "consumerGroup26"
)
public class RocketMQConsumer1 implements RocketMQListener<String> {

    @Autowired
    ElasticsearchClient elasticsearchClient;

    @Override
    public void onMessage(String message) {
        System.out.println("9002接受的消息是:"+message);
        //将json转换为对象
        ObjectMapper objectMapper=new ObjectMapper();
        try {
            TbHotel hotel = objectMapper.readValue(message, TbHotel.class);
            System.out.println("实体转换:"+hotel);

            //数据库查询到一条数据
            TbHotelDoc hotelDoc=new TbHotelDoc(hotel);
            System.out.println(hotelDoc);

            //数据插入ES 无论修改数据还是增加数据都是 插入
            IndexRequest indexRequest=new IndexRequest.Builder<TbHotelDoc>()
                    .id(hotel.getId().toString())
                    .index("hotel")
                    .document(hotelDoc).build();
            elasticsearchClient.index(indexRequest);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2:ES集群

2.1:三台服务器搭建集群

在ES中存在单机处理上限,以及单机的稳定性,所以集群的搭建是必不可少的

假如我们搭建三台服务器作为集群9200,9201,9202

2.2:ES的集群节点角色

在我们搭建的三个节点构成的集群中,每个节点都是相同的类型,都具有如图的特点

1:都是候选主节点

2:都具有数据节点的功能:数据局存储、搜索、聚合、crud

3:都是ingest,具有数据存储预处理功能

这有什么缺点呢?

假如新增id=1、2、3的数据 。数据都会连接到主节点,主节点根据hash取模,然后将数据插入节点1、2、3中(查询同理)。这里就可以看出来,主节点具备记录集群状态,决定分片在那个节点的主节点功能,还得具有数据的存储、搜索功能,也就是上边的三种类型,都具备。当数据量很大的时候,crud的时候,主节点压力很大哦。这就是不划分职责的情况。

在大公司中,集群可能有几百个节点,那么所有的crud都要经过主节点hsah,主节点太累,所以需要把节点按照权限划分,架构出来新的架构

大型ES集群的架构如下

ES集群新增数据如下图:

ES集群查询数据如下图:

2.2:ES集群服务的搭建

1:这里没有自己搭建,参考网上搭建案例

2:不使用kinaba来连接集群,而是cerebo软件来连接集群

3: cerebo控制台查看集群

4:创建索引的DSL


http://www.niftyadmin.cn/n/5459585.html

相关文章

kubuntu23.10安装sdl2及附加库和 sfml2.5.1

2024年3月28号&#xff0c;四&#xff0c;晚上kubuntu23.10下安装了sdl2的如下&#xff0c;没有安装gfx。 sudo apt install libsdl2-dev sudo apt install libsdl2-image-dev sudo apt install libsdl2-ttf-dev sudo apt install libsdl2-mixer-dev sudo apt install libsdl2…

【笔记】Nginx配置类似Tomcat请求接口链路access_log日志

项目部署在tomcat容器中&#xff0c;请求的接口会被记录在文件名&#xff1a;localhost_access_log.2024-03-22.log的文件中&#xff0c;如果使用Nginx也需要记录请求接口&#xff0c;该如何做呢&#xff1f;步骤如下 步骤1&#xff1a; 打开nginx.conf&#xff0c;在 http 块中…

工作常用设计模式

设计模式分类 创建者模式&#xff08;5种&#xff09; 单例模式原型模式工厂方法模式抽象工厂模式建造者模式 结构型模式&#xff08;7种&#xff09; 代理模式适配器模式桥接模式装饰者模式外观模式享元模式组合模式 行为型模式&#xff08;11种&#xff09; 模板方法模…

k8s小白的学习初体验

前言 有些时候的巧合让人匪夷所思&#xff0c;前两周刚刚尝试了一遍Docker操作&#xff0c;紧接着就收到好朋友说要学习k8s容器部署的建议&#xff0c;最近两周抽空看了一些关于k8s的知识&#xff0c;相关概念真的是太多了&#xff0c;概念本身是枯燥的&#xff0c;但是当概念…

数字孪生在农业中应用

生系统采用信息物理融合的思想&#xff0c;首先基于物联网技术完成植物工厂生产系统的数字化镜像&#xff0c;其次基于数字植物系统完成多源异构数据的智能化分析&#xff0c;进一步基于孪生数据完成应对未来情景的智能化决策&#xff0c;最后基于植物工厂机器人完成决策的自动…

【MySQL】mysql数据库小功能整理,持续更新~

目录 1、把从数据库中查询出的两个字段拼接 2、自定义新字段 1、把从数据库中查询出的两个字段拼接 在ThinkPHP中使用 field 查询数据库字段时&#xff0c;使用数据库自带的CONCAT函数使两个字段拼接成一个新的自定义字段。 示例&#xff1a; 有两个字段 number 和 filenam…

FastAPI+React全栈开发08 安装MongoDB

Chapter02 Setting Up the Document Store with MongoDB 08 Installing MongoDB and friends FastAPIReact全栈开发08 安装MongoDB The MongoDB ecosystem is composed of different pieces of software, and I remember that when I was starting to play with it, there w…

单元测试(UT)用例简介

单元测试&#xff08;Unit Testing, UT&#xff09;用例是一系列预先设计好的、针对软件最小可测试单元的测试场景。每一个单元测试用例都是为了验证一个独立代码单元&#xff08;如函数、方法、类&#xff09;的行为是否符合预期。这些用例通常包含以下几个关键组成部分&#…