【ES实战】ES创建Transports客户端时间过长分析

news/2024/7/7 5:50:34 标签: elasticsearch

ES创建Transports客户端时间过长分析

2023年10月19日

文章目录

  • ES创建Transports客户端时间过长分析
    • 问题描述
    • 问题重现
    • 问题分析
      • 是否可以配置链接超时时间
      • 节点建立连接超时逻辑
      • 为啥超时间会出现翻倍
    • 优化方案

在创建ES Transport客户端的时,当出现以下场景时,影响连接速度。

问题描述

  1. 使用ES Transport 客户端创建与集群的链接。
  2. 连接地址里面有不存在的IP
  3. 在增加ES节点时,采用逐个增加的方式

整个建立链接的过程会非常耗时。

问题重现

采用jar依赖如下

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>5.6.16</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>x-pack-transport</artifactId>
            <version>5.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>sniffer</artifactId>
            <version>5.4.2</version>
        </dependency>

创建连接代码如下

        final Settings settings = Settings.builder()
                .put("cluster.name", "common-es")
                .put("client.transport.sniff", true).build();
        final TransportClient transportClient = new PreBuiltXPackTransportClient(settings);
        long t1 = System.currentTimeMillis();
        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.37"), 9800));
        logger.info("第1个错误节点耗时:" + (System.currentTimeMillis() - t1) / 1000);
        long t2 = System.currentTimeMillis();
        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.38"), 9800));
        logger.info("第2个错误节点耗时:" + (System.currentTimeMillis() - t2) / 1000);
        long t3 = System.currentTimeMillis();
        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.39"), 9800));
        logger.info("第3个错误节点耗时:" + (System.currentTimeMillis() - t3) / 1000);

输出结果

[2023-10-19 15:31:31,398] [main] [INFO ] xxx.Client - 第1个错误节点耗时:21
[2023-10-19 15:32:13,414] [main] [INFO ] xxx.Client - 第2个错误节点耗时:42
[2023-10-19 15:32:55,436] [main] [INFO ] xxx.Client - 第3个错误节点耗时:42

问题分析

是否可以配置链接超时时间

通过new PreBuiltXPackTransportClient()方法创建客户端,跟踪源码发现其会在TransportClient.buildTemplate进行建立网络模块服务,在继续debug,会发现会在TcpTransport中方法buildDefaultConnectionProfile构建链接的配置文件。发现其TCP_CONNECT_TIMEOUT默认的配置是30s,起对应的配置参数是transport.tcp.connect_timeout

    static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
        int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);
        int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);
        int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);
        int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
        int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
        ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
        // 链接的超时时间
        builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));
        builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));
        builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
        builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
        // if we are not master eligible we don't need a dedicated channel to publish the state
        builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
        // if we are not a data-node we don't need any dedicated channels for recovery
        builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
        builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
        return builder.build();
    }
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
    timeSetting("transport.tcp.connect_timeout", NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);

节点建立连接超时逻辑

TcpTransport.openConnection(DiscoveryNode node, ConnectionProfile connectionProfile)方法建立通信管道时,在通信之前重组连接的默认配置和自定义配置。在Netty4Transport.connectToChannels()方法内具体生效,future.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));

增加节点的方式

TransportClient类提供了数组方式增加节点和单个节点增加的方式,

    public TransportClient addTransportAddress(TransportAddress transportAddress) {
        nodesService.addTransportAddresses(transportAddress);
        return this;
    }
    
    public TransportClient addTransportAddresses(TransportAddress... transportAddress) {
        nodesService.addTransportAddresses(transportAddress);
        return this;
    }

不过根据代码,发现其都是调用的TransportClientNodesService类的addTransportAddresses(TransportAddress... transportAddresses)方法

    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {
        // 竞争对象锁mutex
        synchronized (mutex) {
            if (closed) {
                throw new IllegalStateException("transport client is closed, can't add an address");
            }
            List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);
            for (TransportAddress transportAddress : transportAddresses) {
                boolean found = false;
                for (DiscoveryNode otherNode : listedNodes) {
                    // 方式连接地址值重复,会自动过滤
                    if (otherNode.getAddress().equals(transportAddress)) {
                        found = true;
                        logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
                        break;
                    }
                }
                if (!found) {
                    filtered.add(transportAddress);
                }
            }
            if (filtered.isEmpty()) {
                return this;
            }
            List<DiscoveryNode> builder = new ArrayList<>(listedNodes);
            for (TransportAddress transportAddress : filtered) {
                DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(),
                        transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion);
                logger.debug("adding address [{}]", node);
                builder.add(node);
            }
            // listNodes里面存放的是配置的连接节点列表
            listedNodes = Collections.unmodifiableList(builder);
            // 调用不同的节点采集-里面也对mutex锁进行竞争
            nodesSampler.sample();
        }
        return this;
    }

NodeSampler.sample()

		public void sample() {
            synchronized (mutex) {
                if (closed) {
                    return;
                }
                doSample();
            }
        }

NodesSampler有两个具体的继承实现类

  • SniffNodesSampler:开启嗅探属性的客户端
  • SimpleNodeSampler:简单客户端

这边对SniffNodesSamplersample()方法进行分析。

        @Override
        protected void doSample() {
            Set<DiscoveryNode> nodesToPing = new HashSet<>();
            // 最新要进行连接的一组节点列表
            for (DiscoveryNode node : listedNodes) {
                nodesToPing.add(node);
            }
            // nodes代表已经连接上的节点列表
            for (DiscoveryNode node : nodes) {
                nodesToPing.add(node);
            }
            // 并发控制辅助类
            final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
            final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
            try {
                for (final DiscoveryNode nodeToPing : nodesToPing) {
                    // 采用线程池的方式去连接节点
                    threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
                        Transport.Connection connectionToClose = null;

                        void onDone() {
                            try {
                                IOUtils.closeWhileHandlingException(connectionToClose);
                            } finally {
                                latch.countDown();
                            }
                        }

                        @Override
                        public void onFailure(Exception e) {
                            onDone();
                            ...
                            ...
                        }

                        @Override
                        protected void doRun() throws Exception {
                            Transport.Connection pingConnection = null;
                            if (nodes.contains(nodeToPing)) {
                                try {
                                    pingConnection = transportService.getConnection(nodeToPing);
                                } catch (NodeNotConnectedException e) {
                                    // will use a temp connection
                                }
                            }
                            if (pingConnection == null) {
                                logger.trace("connecting to cluster node [{}]", nodeToPing);
                                // 尝试去连接节点,超时会抛出异常
                                connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
                                pingConnection = connectionToClose;
                            }
                            // 若有一个节点连接成功会进行集群状态查询,返回值里面包含了全部可用节点
                            transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
                                Requests.clusterStateRequest().clear().nodes(true).local(true),
                                TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
                                    .withTimeout(pingTimeout).build(),
                                new TransportResponseHandler<ClusterStateResponse>() {

                                    @Override
                                    public ClusterStateResponse newInstance() {
                                        return new ClusterStateResponse();
                                    }

                                    @Override
                                    public String executor() {
                                        return ThreadPool.Names.SAME;
                                    }

                                    @Override
                                    public void handleResponse(ClusterStateResponse response) {
                                        clusterStateResponses.put(nodeToPing, response);
                                        onDone();
                                    }

                                    @Override
                                    public void handleException(TransportException e) {
                                        logger.info(
                                            (Supplier<?>) () -> new ParameterizedMessage(
                                                "failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
                                        try {
                                            hostFailureListener.onNodeDisconnected(nodeToPing, e);
                                        } finally {
                                            onDone();
                                        }
                                    }
                                });
                        }
                    });
                }
                latch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }

            HashSet<DiscoveryNode> newNodes = new HashSet<>();
            HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
            for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
                if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
                    logger.warn("node {} not part of the cluster {}, ignoring...",
                            entry.getValue().getState().nodes().getLocalNode(), clusterName);
                    newFilteredNodes.add(entry.getKey());
                    continue;
                }
                for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {
                    newNodes.add(cursor.value);
                }
            }
            // 验证新节点是否可连接
            nodes = validateNewNodes(newNodes);
            filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
        }

通过代码发现,其实用了线程池并发连接节点,但是也使用了CountDownLatch,这就导致了,如果有一个节点超时,那整个批次都需要等待这么长的时间。典型的长尾效应

为啥超时间会出现翻倍

建立TransportClientNodesService服务时,构造函数中增加了对NodeSampler的调度。

    TransportClientNodesService(Settings settings, TransportService transportService,
                                       ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
       ...
       ...
       ...
        this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
    }

ScheduledNodeSampler

当调度触发之后,也会去执行nodesSampler.sample();,也就对mutex锁有了竞争,当调用增加连接方法之后,就会有两次调用 nodesSampler.sample();也就会将超时时间翻倍。

class ScheduledNodeSampler implements Runnable {
    @Override
    public void run() {
        try {
            nodesSampler.sample();
            if (!closed) {
                nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);
            }
        } catch (Exception e) {
            logger.warn("failed to sample", e);
        }
    }
}

优化方案

  1. Settings增加超时transport的tcp超时配置。

     final Settings settings = Settings.builder()
                    .put("cluster.name", "common-es")
                    .put("transport.tcp.connect_timeout", "5s")
                    .put("client.transport.sniff", true).build();
    

    注意此配置的参数名不同版本之间存在差异。

  2. 使用数组方式增加连接节点,减少反复调用TransportClientNodesService addTransportAddresses次数,就是在减少分批次的产生阻塞耗时


http://www.niftyadmin.cn/n/5111189.html

相关文章

google登录k8s dashboard ui显示“您的连接不是私密连接”问题解决梳理

1.问题描述 OS Version:CentOS Linux release 7.9.2009 (Core) K8S Version:Kubernetes v1.20.4 k8s dashboard ui安装完毕后&#xff0c;通过google浏览器登录返现https网页&#xff0c;发现非官方的https网页无法打开 网址&#xff1a;https://192.168.10.236:31001 2.原…

C++:类的默认成员函数------构造函数析构函数(超详细解析,小白一看就懂!)

目录 一、前言 二、为什么会出现构造函数和析构函数 三、构造函数 &#x1f34e;构造函数的概念 &#x1f350;构造函数特性 &#x1f4a6;解释特性3&#xff1a;对象实例化时编译器自动调用对应的构造函数 &#x1f4a6;解释特性4&#xff1a;构造函数支持重载 &…

C#接口和继承的区别、联系与使用场景

在C#编程语言中&#xff0c;接口和继承是两个核心的概念。本文将详细介绍接口和继承之间的区别与联系&#xff0c;并探讨它们在实际编程中的使用场景。通过代码示例和详细说明&#xff0c;读者将能够深入理解这两个概念的功能和用法。 目录 引言1. 区别与联系1.1 区别1.2 联系 …

mysql检验分区性能的操作

mysql检验分区性能的操作 创建两个结构相同但是一个有分区另外一个没有分区的表 如上图我们给part_tab5创建的分区为1024个&#xff0c;因为mysql中允许最多有1024个分区&#xff1b;之前我测试的是创建8个分区&#xff0c;然后插入500万条数据&#xff0c;然后按照id查询&…

实际项目中最常用的设计模式

在软件开发领域,设计模式是一种经过验证的通用解决方案,用于解决各种常见问题。它们有助于提高代码的可维护性、可扩展性和可重用性。虽然有许多不同的设计模式,但以下是实际项目中最常用的一些: 1. 单例模式 (Singleton Pattern) 单例模式确保一个类只有一个实例,并提供…

使用命令行cli脚手架创建uniapp项目(微信小程序、H5、APP)

除了使用HBuilderX工具可视化搭建项目外&#xff0c;DCloud官方还提供了一个脚手架用于命令行搭建项目。 uni-app项目支持 uni cli和 HBuilderX cli两种脚手架工具&#xff1a; uni cli&#xff1a;面向非HBuilderX的用户&#xff08;如习惯使用vscode/webstorm的开发者&#…

Vue2基础知识(三) 组件化

目录 一 组件1.1 组件的定义1.2 特点1.3 Vue-extend1.4 VueCompent 二 脚手架2.1 安装2.2 结构目录2.3 Render函数2.4 修改默认配置2.5 Ref 属性2.6 Prop 属性2.7 Mixin 属性2.8 插件2.9 Scoped 三 组件3.1 组件的注册3.1.1 局部注册3.1.2 全局注册 3.2 组件的通信3.2.1 父子关…

MySQL数据库详细解析

目录 MySQL 数据库的工作原理 MySQL 的核心特性 1. 支持 SQL 2. 多存储引擎支持 3. ACID 兼容 4. 多平台支持 5. 高性能 6. 复制和故障转移 7. 安全性 8. 社区支持和商业支持 MySQL 数据库的最佳实践 结论 MySQL 是一种开源的关系型数据库管理系统&#xff08;RDBM…