Spring Cloud Feign内部实现代码细节

1. 概述

Feign用于服务间调用,它的内部实现是一个包含Ribbon(负载均衡)的**JDK-HttpURLConnection(Http)**调用。虽然调用形式是类似于RPC,但是实际调用是Http,这也是为什么Feign被称为伪RPC调用的原因。
内部调用过程如下:

2. 代码细节

1) BaseLoadBalancer.java配置初始化

重点功能: 1. 初始化负载均衡策略 2. 初始化取服务注册列表调度策略

void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
    ...
    // 每隔30s Ping一次
    int pingIntervalTime = Integer.parseInt(""
            + clientConfig.getProperty(
                    CommonClientConfigKey.NFLoadBalancerPingInterval,
                    Integer.parseInt("30")));
    // 每次最多Ping 2s
    int maxTotalPingTime = Integer.parseInt(""
            + clientConfig.getProperty(
                    CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
                    Integer.parseInt("2")));
    setPingInterval(pingIntervalTime);
    setMaxTotalPingTime(maxTotalPingTime);

    // cross associate with each other
    // i.e. Rule,Ping meet your container LB
    // LB, these are your Ping and Rule guys ...
    // 设置负载均衡规则
    setRule(rule);
    // 初始化取服务注册列表调度策略
    setPing(ping);

    setLoadBalancerStats(stats);
    rule.setLoadBalancer(this);
    ...
}

2) 负载均衡策略初始化

重点功能: 1. 默认使用轮询策略

BaseLoadBalancer.java

public void setRule(IRule rule) {
    if (rule != null) {
        this.rule = rule;
    } else {
        /* default rule */
        // 默认使用轮询策略
        this.rule = new RoundRobinRule();
    }
    if (this.rule.getLoadBalancer() != this) {
        this.rule.setLoadBalancer(this);
    }
}

RoundRobinRule.java

private AtomicInteger nextServerCyclicCounter;

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }

    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        List<Server> reachableServers = lb.getReachableServers();
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }
        // 轮询重点算法
        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);

        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }

        // Next.
        server = null;
    }

    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
                + lb);
    }
    return server;
}

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextServerCyclicCounter.get();
        int next = (current + 1) % modulo;
        if (nextServerCyclicCounter.compareAndSet(current, next))
            return next;
    }
}

3) 初始化取服务注册列表调度策略

重点功能: 1. 设置轮询间隔为30s 一次

注意: 这里没有做实际的Ping,只是获取缓存的注册列表的alive服务,原因是为了提高性能

BaseLoadBalancer.java

public void setPing(IPing ping) {
    if (ping != null) {
        if (!ping.equals(this.ping)) {
            this.ping = ping;
            setupPingTask(); // since ping data changed
        }
    } else {
        this.ping = null;
        // cancel the timer task
        lbTimer.cancel();
    }
}

void setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
    // 这里虽然默认设置是10s一次,但是在初始化的时候,设置了30s一次
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}

class Pinger {

    private final IPingStrategy pingerStrategy;

    public Pinger(IPingStrategy pingerStrategy) {
        this.pingerStrategy = pingerStrategy;
    }

    public void runPinger() throws Exception {
        if (!pingInProgress.compareAndSet(false, true)) {
            return; // Ping in progress - nothing to do
        }

        // we are "in" - we get to Ping

        Server[] allServers = null;
        boolean[] results = null;

        Lock allLock = null;
        Lock upLock = null;

        try {
            /*
             * The readLock should be free unless an addServer operation is
             * going on...
             */
            allLock = allServerLock.readLock();
            allLock.lock();
            allServers = allServerList.toArray(new Server[allServerList.size()]);
            allLock.unlock();

            int numCandidates = allServers.length;
            results = pingerStrategy.pingServers(ping, allServers);

            final List<Server> newUpList = new ArrayList<Server>();
            final List<Server> changedServers = new ArrayList<Server>();

            for (int i = 0; i < numCandidates; i++) {
                boolean isAlive = results[i];
                Server svr = allServers[i];
                boolean oldIsAlive = svr.isAlive();

                svr.setAlive(isAlive);

                if (oldIsAlive != isAlive) {
                    changedServers.add(svr);
                    logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}",
                        name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                }

                if (isAlive) {
                    newUpList.add(svr);
                }
            }
            upLock = upServerLock.writeLock();
            upLock.lock();
            upServerList = newUpList;
            upLock.unlock();

            notifyServerStatusChangeListener(changedServers);
        } finally {
            pingInProgress.set(false);
        }
    }
}

private static class SerialPingStrategy implements IPingStrategy {

    @Override
    public boolean[] pingServers(IPing ping, Server[] servers) {
        int numCandidates = servers.length;
        boolean[] results = new boolean[numCandidates];

        logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

        for (int i = 0; i < numCandidates; i++) {
            results[i] = false; /* Default answer is DEAD. */
            try {
                // NOTE: IFF we were doing a real ping
                // assuming we had a large set of servers (say 15)
                // the logic below will run them serially
                // hence taking 15 times the amount of time it takes
                // to ping each server
                // A better method would be to put this in an executor
                // pool
                // But, at the time of this writing, we dont REALLY
                // use a Real Ping (its mostly in memory eureka call)
                // hence we can afford to simplify this design and run
                // this
                // serially
                if (ping != null) {
                    results[i] = ping.isAlive(servers[i]);
                }
            } catch (Exception e) {
                logger.error("Exception while pinging Server: '{}'", servers[i], e);
            }
        }
        return results;
    }
}

4) 最后拼接完整URL使用JDK-HttpURLConnection进行调用

SynchronousMethodHandler.java(io.github.openfeign:feign-core:10.10.1/feign.SynchronousMethodHandler.java)

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
    Request request = this.targetRequest(template);
    if (this.logLevel != Level.NONE) {
        this.logger.logRequest(this.metadata.configKey(), this.logLevel, request);
    }

    long start = System.nanoTime();

    Response response;
    try {
        response = this.client.execute(request, options);
        response = response.toBuilder().request(request).requestTemplate(template).build();
    } catch (IOException var13) {
        if (this.logLevel != Level.NONE) {
            this.logger.logIOException(this.metadata.configKey(), this.logLevel, var13, this.elapsedTime(start));
        }

        throw FeignException.errorExecuting(request, var13);
    }
    ...
}

LoadBalancerFeignClient.java

@Override
public Response execute(Request request, Request.Options options) throws IOException {
    try {
        URI asUri = URI.create(request.url());
        String clientName = asUri.getHost();
        URI uriWithoutHost = cleanUrl(request.url(), clientName);
        FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                this.delegate, request, uriWithoutHost);

        IClientConfig requestConfig = getClientConfig(options, clientName);
        return lbClient(clientName)
                .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
    }
    catch (ClientException e) {
        IOException io = findIOException(e);
        if (io != null) {
            throw io;
        }
        throw new RuntimeException(e);
    }
}

AbstractLoadBalancerAwareClient.java

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    // 获取真实访问URL
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    }
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }

}

FeignLoadBalancer.java

@Override
public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
        throws IOException {
    Request.Options options;
    if (configOverride != null) {
        RibbonProperties override = RibbonProperties.from(configOverride);
        options = new Request.Options(override.connectTimeout(this.connectTimeout),
                override.readTimeout(this.readTimeout));
    }
    else {
        options = new Request.Options(this.connectTimeout, this.readTimeout);
    }
    Response response = request.client().execute(request.toRequest(), options);
    return new RibbonResponse(request.getUri(), response);
}

feign.Client.java

@Override
public Response execute(Request request, Options options) throws IOException {
  HttpURLConnection connection = convertAndSend(request, options);
  return convertResponse(connection, request);
}

Response convertResponse(HttpURLConnection connection, Request request) throws IOException {
  // 使用HttpURLConnection 真实进行Http调用
  int status = connection.getResponseCode();
  String reason = connection.getResponseMessage();

  if (status < 0) {
    throw new IOException(format("Invalid status(%s) executing %s %s", status,
        connection.getRequestMethod(), connection.getURL()));
  }

  Map<String, Collection<String>> headers = new LinkedHashMap<>();
  for (Map.Entry<String, List<String>> field : connection.getHeaderFields().entrySet()) {
    // response message
    if (field.getKey() != null) {
      headers.put(field.getKey(), field.getValue());
    }
  }

  Integer length = connection.getContentLength();
  if (length == -1) {
    length = null;
  }
  InputStream stream;
  if (status >= 400) {
    stream = connection.getErrorStream();
  } else {
    stream = connection.getInputStream();
  }
  return Response.builder()
      .status(status)
      .reason(reason)
      .headers(headers)
      .request(request)
      .body(stream, length)
      .build();
}

拓展干货阅读:一线大厂面试题、高并发等主流技术资料

以上就是Spring Cloud Feign内部实现代码细节的详细内容,更多关于Spring Cloud Feign的资料请关注我们其它相关文章!

(0)

相关推荐

  • 详解spring cloud feign踩坑记录

    1:多客户端时,feign接口抽取到公共jar中,此时,客户端的启动类上需要对该jar中feign所在的包进行扫描,要在spring和feign中同时注册,否则启动时会报:"Consider defining a bean of type '******Feign' in your configuration." @SpringBootApplication @EnableTransactionManagement @EnableDiscoveryClient @ComponentSc

  • SpringCloud 服务负载均衡和调用 Ribbon、OpenFeign的方法

    1.Ribbon Spring Cloud Ribbon是基于Netflix Ribbon实现的-套客户端―负载均衡的工具. 简单的说,Ribbon是Netlix发布的开源项目,主要功能是提供客户端的软件负载均衡算法和服务调用.Ribbon客户端组件提供一系列完善的配置项如连接超时,重试等.简单的说,就是在配置文件中列出Load Balancer(简称LB)后面所有的机器,Ribbon会自动的帮助你基于某种规则(如简单轮询,随机连接等)去连接这些机器.我们很容易使用Ribbon实现自定义的负载均

  • 详解springcloud 基于feign的服务接口的统一hystrix降级处理

    springcloud开发微服务时,基于feign来做声明式服务接口,当启用hystrix服务熔断降级时,项目服务众多,每个Feign服务接口都得写一些重复问的服务降级处理代码,势必显得枯燥无味: Feign服务接口: @FeignClient(name="springcloud-nacos-producer", qualifier="productApiService", contextId="productApiService", fallb

  • Spring Cloud如何使用Feign构造多参数的请求

    本节我们来探讨如何使用Feign构造多参数的请求.笔者以GET以及POST方法的请求为例进行讲解,其他方法(例如DELETE.PUT等)的请求原理相通,读者可自行研究. GET请求多参数的URL 假设我们请求的URL包含多个参数,例如http://microservice-provider-user/get?id=1&username=张三 ,要如何构造呢? 我们知道,Spring Cloud为Feign添加了Spring MVC的注解支持,那么我们不妨按照Spring MVC的写法尝试一下:

  • 解决Spring Cloud Feign 请求时附带请求头的问题

    问题描述 Feign 在请求时是不会将 request 的请求头带着请求的,导致假如 Feign 调用的接口需要请求头的信息,比如当前用户的 token 之类的就获取不到 解决方案 FeignConfiguration 通过实现 Feign 的 RequestInterceptor 将从上下文中获取到的请求头信息循环设置到 Feign 请求头中. /** * feign 配置文件 * 将请求头中的参数,全部作为 feign 请求头参数传递 * @author: linjinp * @create

  • 详解Spring Cloud Feign 熔断配置的一些小坑

    1.在使用feign做服务调用时,使用继承的方式调用服务,加入Hystrix的熔断处理fallback配置时,会报错,已解决. 2.使用feign默认配置,熔断不生效,已解决. 最近在做微服务的学习,发现在使用feign做服务调用时,使用继承的方式调用服务,加入Hystrix的熔断处理fallback配置时,会报错,代码如下: @RequestMapping("/demo/api") public interface HelloApi { @GetMapping("user/

  • Spring Cloud Feign 自定义配置(重试、拦截与错误码处理) 代码实践

    基于 spring-boot-starter-parent 2.1.9.RELEASE, spring-cloud-openfeign 2.1.3.RELEASE 引子 Feign 是一个声明式.模板化的HTTP客户端,简化了系统发起Http请求.创建它时,只需要创建一个接口,然后加上FeignClient注解,使用它时,就像调用本地方法一样,作为开发者的我们完全感知不到这是在调用远程的方法,也感知不到背后发起了HTTP请求: /** * @author axin * @suammry xx 客

  • Spring Cloud Feign文件传输的示例代码

    一.配置文件解析器 服务提供者和消费者都需要配置文件解析器,这里使用 commons-fileupload 替换原有的解析器: 依赖: <dependency> <groupId>commons-fileupload</groupId> <artifactId>commons-fileupload</artifactId> <version>1.3.1</version> </dependency> 注入 be

  • SpringCloud Feign参数问题及解决方法

    这篇文章主要介绍了SpringCloud Feign参数问题及解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 今天遇到使用Feign调用微服务,传递参数时遇到几个问题 1.无参数 以GET方式请求 服务提供者 @RequestMapping("/hello") public String Hello(){ return "hello,provider"; } 服务消费者 @GetMapping("

  • Spring Cloud Feign内部实现代码细节

    1. 概述 Feign用于服务间调用,它的内部实现是一个包含Ribbon(负载均衡)的**JDK-HttpURLConnection(Http)**调用.虽然调用形式是类似于RPC,但是实际调用是Http,这也是为什么Feign被称为伪RPC调用的原因. 内部调用过程如下: 2. 代码细节 1) BaseLoadBalancer.java配置初始化 重点功能: 1. 初始化负载均衡策略 2. 初始化取服务注册列表调度策略 void initWithConfig(IClientConfig cli

  • Spring Cloud Feign性能优化代码实例

    1.替换 tomcat 首先,把 tomcat 换成 undertow,这个性能在 Jmeter 的压测下,undertow 比 tomcat 高一倍第一步,pom 修改去除tomcat <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <ex

  • Spring Cloud Feign实现文件上传下载的示例代码

    目录 独立使用Feign 上传文件 下载文件 使用Spring Cloud Feign 上传文件 下载文件 总结 Feign框架对于文件上传消息体格式并没有做原生支持,需要集成模块feign-form来实现. 独立使用Feign 添加模块依赖: <!-- Feign框架核心 --> <dependency> <groupId>io.github.openfeign</groupId> <artifactId>feign-core</arti

  • Spring Cloud Feign的文件上传实现的示例代码

    在Spring Cloud封装的Feign中并不直接支持传文件,但可以通过引入Feign的扩展包来实现,本来就来具体说说如何实现. 服务提供方(接收文件) 服务提供方的实现比较简单,就按Spring MVC的正常实现方式即可,比如: @RestController public class UploadController { @PostMapping(value = "/uploadFile", consumes = MediaType.MULTIPART_FORM_DATA_VAL

  • 使用Spring Cloud Feign上传文件的示例

    最近经常有人问Spring Cloud Feign如何上传文件.有团队的新成员,也有其他公司的兄弟.本文简单做个总结-- 早期的Spring Cloud中,Feign本身是没有上传文件的能力的(1年之前),要想实现这一点,需要自己去编写Encoder 去实现上传.现在我们幸福了很多.因为Feign官方提供了子项目feign-form ,其中实现了上传所需的 Encoder . 注:笔者测试的版本是Edgware.RELEASE.Camden.Dalston同样适应本文所述. 加依赖 <depen

  • Spring cloud Feign 深度学习与应用详解

    简介 Spring Cloud Feign是一个声明式的Web Service客户端,它的目的就是让Web Service调用更加简单.Feign提供了HTTP请求的模板,通过编写简单的接口和插入注解,就可以定义好HTTP请求的参数.格式.地址等信息.Feign会完全代理HTTP请求,开发时只需要像调用方法一样调用它就可以完成服务请求及相关处理.开源地址:https://github.com/OpenFeign/feign.Feign整合了Ribbon负载和Hystrix熔断,可以不再需要显式地

  • Spring Cloud Feign组件实例解析

    这篇文章主要介绍了Spring Cloud Feign组件实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 采用Spring Cloud微服务框架后,经常会涉及到服务间调用,服务间调用采用了Feign组件. 由于之前有使用dubbo经验.dubbo的负载均衡策略(轮训.最小连接数.随机轮训.加权轮训),dubbo失败策略(快速失败.失败重试等等), 所以Feign负载均衡策略的是什么? 失败后是否会重试,重试策略又是什么?带这个疑问,查了

  • Spring Cloud Feign 使用对象参数的操作

    目录 概述 @RequestBody @SpringQueryMap QueryMapEncoder 解决方案 概述 Spring Cloud Feign 用于微服务的封装,通过接口代理的实现方式让微服务调用变得简单,让微服务的使用上如同本地服务.但是它在传参方面不是很完美.在使用 Feign 代理 GET 请求时,对于简单参数(基本类型.包装器.字符串)的使用上没有困难,但是在使用对象传参时却无法自动的将对象包含的字段解析出来. 如果你没耐心看完,直接跳到最后一个标题跟着操作就行了. @Req

  • Spring Cloud Feign实例讲解学习

    前面博文搭建了一个Eureka+Ribbon+Hystrix的框架,虽然可以基本满足服务之间的调用,但是代码看起来实在丑陋,每次客户端都要写一个restTemplate,为了让调用更美观,可读性更强,现在我们开始学习使用Feign. Feign包含了Ribbon和Hystrix,这个在实战中才慢慢体会到它的意义,所谓的包含并不是Feign的jar包包含有Ribbon和Hystrix的jar包这种物理上的包含,而是Feign的功能包含了其他两者的功能这种逻辑上的包含.简言之:Feign能干Ribb

随机推荐