1.配置
import com.zty.common.util.WebUtils; import io.netty.channel.ChannelOption; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.client.WebClient; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; import java.time.Duration; /** * WebClient配置 * @author zty */ @Slf4j @Configuration public class WebClientConfig { @Bean public WebClient webClient(){ //配置固定大小连接池 ConnectionProvider provider = ConnectionProvider .builder("custom") // 等待超时时间 .pendingAcquireTimeout(Duration.ofSeconds(10)) // 最大连接数 .maxConnections(200) // 最大空闲时间 .maxIdleTime(Duration.ofSeconds(5)) // 最大等待连接数量 .pendingAcquireMaxCount(-1) .build(); /** * doOnBind 当服务器channel即将被绑定的时候调用。 * doOnBound 当服务器channel已经被绑定的时候调用。 * doOnChannelInit 当channel初始化的时候被调用。 * doOnConnection 当一个远程客户端连接上的时候被调用。 * doOnUnbound 当服务器channel解绑的时候被调用。 */ HttpClient httpClient = HttpClient.create(provider) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 6000) .option(ChannelOption.SO_KEEPALIVE, true) .responseTimeout(Duration.ofSeconds(6)) .keepAlive(true) //连接成功 .doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(6)) .addHandlerLast(new WriteTimeoutHandler(6))) //每次请求后执行flush,防止服务器主动断开连接 .doAfterRequest((httpClientRequest, connection) -> { connection.channel().alloc().buffer().release(); connection.channel().flush(); connection.channel().pipeline().flush(); }); return WebClient.builder() .baseUrl("http://127.0.0.1:8080") .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .defaultHeader(HttpHeaders.CONNECTION, "keep-alive") .clientConnector(new ReactorClientHttpConnector(httpClient)) .build(); } }
2.使用
import com.alibaba.fastjson2.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; import java.util.HashMap; /** * http消息发送工具 * @author zty */ @Slf4j @Component public class WebUtils { @Autowired WebClient webClient; @Async public void post(HashMap message){ // 发送请求 webClient.post() // 请求路径 .uri("/test") // 携带参数 .bodyValue(JSON.toJSONString(message)) // 获取响应体 .retrieve() // 响应数据类型转换 .bodyToMono(String.class) .doOnError(throwable -> log.info(throwable.getMessage())) .onErrorResume(e -> Mono.just("Error " + e.getMessage())) // 异步 .subscribe(); } }

(图片来源网络,侵删)