Java 网络编程
本文主要探讨Java网络编程,其实这部分内容也属于Java IO,但是因为本身体量较大,内容较多,又是Java开发的重点,所以单独成一章来写。
文中会涉及到IO的基础知识,如果你对他们还没有掌握,请参考:Java IO
文章分为两部分:Socket编程和HTTP编程。
# 网络协议分层
在了解具体的Java网络编程之前,我们需要对网络协议有一个初步的认识。
因为网络协议的复杂性,所以要分层,结构如下:
实践当中,TCP/IP四层协议见得比较多。简单来说,四层协议的作用如下:
- 数据链路层:解决路由器、交换机之间的互联问题
- 网络层:解决主机之间的互联问题
- 传输层:解决程序之间的互联问题
- 应用层:解决上层内容之间的互联问题
# Socket编程
Socket是TCP/IP协议栈中不同主机应用进程之间进行双向通信的端点抽象。
Socket编程是传输层的编程,主要解决主机-应用进程之间通信的问题。
比如主机A中的Client程序要与主机B中的Server程序进行通信,Client发送“我要吃肉”,Server回复“地主家没有余粮了”。
# 1、传统IO
在传统Java IO中的实现,服务端:
void server(){
try (ServerSocket server = new ServerSocket(8888);) {
while (true){
Socket socket = server.accept();
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
Scanner scanner = new Scanner(is);
PrintWriter pw = new PrintWriter(new OutputStreamWriter(os, StandardCharsets.UTF_8), true);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
if (line.startsWith("我要吃肉")){
pw.println("地主家没有余粮了");
}else {
pw.println("我不知道你在说什么");
}
if ("exit".equals(line)) {
pw.println("exit");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
客户端:
void client(){
try (Socket socket = new Socket("localhost", 8888);
InputStream is = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();) {
PrintWriter pw = new PrintWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8), true);
Scanner scanner = new Scanner(is, StandardCharsets.UTF_8);
pw.println("我要吃肉");
pw.println("exit");
boolean done = false;
while (!done && scanner.hasNextLine()) {
String line = scanner.nextLine();
System.out.println(line);
if (line.equals("exit")){
done = true;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
上面的server程序只能为一个客户端服务,如果要为多个客户端服务,需要引入多线程:
try (ServerSocket server = new ServerSocket(8888)) {
while (true) {
Socket socket = server.accept();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
// 套接字处理程序
}
});
thread.start();
}
} catch (IOException e) {
e.printStackTrace();
}
每一个Client都启动一个线程有点浪费,也可以考虑引入线程池。
# 2、NIO
传统IO是阻塞IO,应对几百个连接的普通应用,还是得心应手的。如果连接数急剧上升,那么按照传统IO的方式编程,线程的上下文切换的开销会很大。
为了解决传统IO的问题,Java 1.4引入了NIO,NIO 利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,这样就可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高。
在 NIO 模型中,通过一个线程来监控多个通道的状态,并判断哪些通道已经就绪(即有 I/O 操作可执行)。这个监控和判断的过程称为 "select" 阶段。在此阶段,线程会阻塞,直到至少一个通道就绪(或者一段时间超时)。
一旦至少一个通道就绪,线程就会从阻塞状态中唤醒,并继续执行后续操作。这些后续操作可以包括读取就绪通道的数据、写入数据到就绪通道等。这样,使用单线程轮询事件的方式,可以有效地处理多个非阻塞通道的 I/O 操作。
虽然在 "select" 阶段线程会阻塞,但整个 NIO 模型是非阻塞的。因为一旦线程从阻塞状态中唤醒,它就会立即执行处理就绪通道的操作,而不会等待其他通道的 I/O 操作完成。这样可以充分利用单线程处理多个通道,并避免线程被 I/O 操作阻塞的问题。
服务端示例:
void server() throws IOException {
Selector selector = Selector.open();
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
ServerSocket serverSocket = ssChannel.socket();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
serverSocket.bind(address);
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
ServerSocketChannel ssChannel1 = (ServerSocketChannel) key.channel();
// 服务器会为每个新连接创建一个 SocketChannel
SocketChannel sChannel = ssChannel1.accept();
sChannel.configureBlocking(false);
// 这个新连接主要用于从客户端读取数据
sChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel sChannel = (SocketChannel) key.channel();
String line = readDataFromSocketChannel(sChannel);
if (line.startsWith("我要吃肉")){
sChannel.write(StandardCharsets.UTF_8.encode("地主家没有余粮了"));
}else {
sChannel.write(StandardCharsets.UTF_8.encode("我不知道你在说什么"));
}
sChannel.close();
}
keyIterator.remove();
}
}
}
private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder data = new StringBuilder();
while (true) {
buffer.clear();
int n = sChannel.read(buffer);
if (n <= 0) {
break;
}
buffer.flip();
int limit = buffer.limit();
data.append(StandardCharsets.UTF_8.decode(buffer));
buffer.clear();
}
return data.toString();
}
NIO的逻辑是:
- Selector是一个调度器
- 创建ServerSocketChannel,并向Selector注册。SelectionKey.OP_ACCEPT表示关注的是新请求,ssChannel.configureBlocking(false)表示非阻塞
- Selector 阻塞在 select 操作,当有 Channel 发生接入请求,就会被唤醒
- 唤醒之后,对key进行批量处理。如果key是连接事件,则把此通道上注册为数据读取;如果key是读取事件,则进行对应的处理
客户端的逻辑基本没变化:
void client() throws IOException {
Socket socket = new Socket("127.0.0.1", 8888);
OutputStream out = socket.getOutputStream();
out.write("我要吃肉".getBytes());
Scanner scanner = new Scanner(socket.getInputStream(), StandardCharsets.UTF_8);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
System.out.println(line);
}
out.close();
}
有人问,当请求量变大了,单线程处理是否效率太低呢?
其实NIO 可以使用多线程来处理请求,以提高系统的处理效率和吞吐量。只需要在处理SelectionKey key = keyIterator.next();
的时候引入线程池即可。
# 3、AIO
NIO的select,还属于同步IO范畴,也就是”应用程序不询问我,我绝不会主动通知“。select是用轮询来实现的。
在Java 7中引入了NIO 2,也可以称为A(Asynchronous)IO,也就是异步IO。
这是一种不同的编程思路:采用“订阅-通知”模式,即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的函数。
示例代码:
public class AioSocketTest {
static final InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
private static final Object waitObject = new Object();
@Test
void server() throws IOException, InterruptedException {
AsynchronousServerSocketChannel serverSock = AsynchronousServerSocketChannel.open().bind(address);
serverSock.accept(serverSock, new CompletionHandler<>() { //为异步操作指定CompletionHandler回调函数
@SneakyThrows
@Override
public void completed(AsynchronousSocketChannel socketChannel, AsynchronousServerSocketChannel serverSock) {
serverSock.accept(serverSock, this);
ByteBuffer byteBuffer = ByteBuffer.allocate(50);
socketChannel.read(byteBuffer, new StringBuffer(), new IntegerStringBufferCompletionHandler(socketChannel, byteBuffer));
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
}
});
//等待,以便观察现象(这个和要讲解的原理本身没有任何关系,只是为了保证守护线程不会退出)
synchronized(waitObject) {
waitObject.wait();
}
}
@Test
void client() throws IOException{
AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open();
//连接服务端,异步方式
asynchronousSocketChannel.connect(address, asynchronousSocketChannel,
new CompletionHandler<>() {
@SneakyThrows
@Override
public void completed(Void result, AsynchronousSocketChannel attachment) {
System.out.println("连接成功");
attachment.write(ByteBuffer.wrap("我要吃肉".getBytes())).get();
ByteBuffer buffer =ByteBuffer.allocate(1024);
while (attachment.read(buffer).get() != -1) {
buffer.flip();
CharBuffer decode = Charset.defaultCharset().decode(buffer);
System.out.println(decode);
buffer.clear();
if (decode.toString().equals("exit")){
break;
}
attachment.write(ByteBuffer.wrap("exit".getBytes())).get();
}
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
}
});
}
private static class IntegerStringBufferCompletionHandler implements CompletionHandler<Integer, StringBuffer> {
private final AsynchronousSocketChannel socketChannel;
private final ByteBuffer byteBuffer;
public IntegerStringBufferCompletionHandler(AsynchronousSocketChannel socketChannel, ByteBuffer byteBuffer) {
this.socketChannel = socketChannel;
this.byteBuffer = byteBuffer;
}
@SneakyThrows
@Override
public void completed(Integer result, StringBuffer attachment) {
if(result == -1) {
try {
socketChannel.close();
} catch (IOException e) {
log.error("socketChannel.close()",e);
}
return;
}
byteBuffer.flip();
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBuffer);
byteBuffer.clear();
String line = charBuffer.toString();
System.out.println("接受到内容:"+line);
if ("exit".equals(line)) {
socketChannel.write(StandardCharsets.UTF_8.encode("exit")).get();
}
if (line.startsWith("我要吃肉")){
socketChannel.write(StandardCharsets.UTF_8.encode("地主家没有余粮了")).get();
}else {
socketChannel.write(StandardCharsets.UTF_8.encode("我不知道你在说什么")).get();
}
socketChannel.read(byteBuffer,attachment,this);
}
@Override
public void failed(Throwable exc, StringBuffer attachment) {
}
}
}
从代码中可以看到,使用了多是异步的编程方式。
AIO与NIO,两者在性能方面表现是近似的。它会比NIO要更加灵活,代码看起来也更加简洁。
在IO密集型的场景下,将线程池的个数设置为1,它效果NIO一样;对于计算密集型的场景,可以将线程池设置成与CPU核心数一致,这样可以最大限度的利用计算资源。
# 4、Netty
Java 的标准类库,由于其基础性、通用性的定位,往往过于关注技术模型上的抽象,而不是从一线应用开发者的角度去思考。开发者需要深入掌握线程、IO、网络等相关概念,学习路径很长,很容易导致代码复杂、晦涩,即使是有经验的工程师,也难以快速地写出高可靠性的实现。
业界流行的IO网络编程框架Netty,更加强调业务逻辑和技术逻辑的隔离,通过各种方便的抽象,让网络编程变得更加简单。
此外,从性能角度看,Netty不强调通用性、跨平台,他通过极致的Linux环境特定优化,性能有很大提升;
从网络协议角度看,Netty提供了TCP、UDP、SCTP协议,也支持HTTP、WebSocket等多种应用协议。
Netty的官方示例 (opens new window)
如果想系统学习Netty,推荐Netty权威指南 (opens new window)
# 5. 与Nginx并发模型的对比
Nginx 用的是非阻塞 I/O,采用的是多进程单线程结构,Nginx 被认为是一个 I/O 密集型系统:
高并发连接:Nginx的设计目标之一是能够处理大量的并发连接。它采用了非阻塞I/O模型和事件驱动机制,能够同时处理多个连接而不会阻塞其他连接的处理。这使得Nginx能够高效地处理大量的并发请求。
高效的反向代理和负载均衡:Nginx作为反向代理和负载均衡器,其主要任务是接收客户端请求并将其转发到后端服务器。这涉及到频繁的网络通信,包括接收请求、发送请求、接收响应等操作,这些操作都属于I/O操作。
静态文件服务:Nginx在处理静态文件时非常高效。由于静态文件通常以磁盘文件的形式存储,Nginx可以使用非阻塞I/O方式从磁盘读取文件,然后将文件发送给客户端。这样可以充分利用磁盘I/O的并行性,提高文件传输效率。
日志记录:Nginx具有强大的日志记录功能,可以记录每个请求的详细信息。日志记录涉及到频繁的文件写操作,这也是一种I/O操作。
我们上面介绍NIO的时候说到可以通过引入线程池来解决并发问题,其实Netty采用Reactor模式,也是引入了线程池,来解决并发问题。
那么,问题来了:为何Nginx可以采用单线程,而Netty还需要引入线程池呢?
另一个重要的中间件:Tomcat,本身也是采用了非阻塞IO+线程池。这里我们放在一起进行对比。
Tomcat和Netty要引入线程池的原因可能是:
功能和用途:Tomcat是一个Java Servlet容器,主要用于处理Web应用程序。它需要提供完整的Servlet规范支持,并且需要处理复杂的HTTP请求和响应。Netty则是一个通用的高性能网络应用框架,可以用于构建各种类型的网络应用。它更加灵活,并且可以自定义协议和处理逻辑。
多线程模型:Tomcat和Netty选择使用线程池的模型来处理并发请求。这种模型相对简单,易于理解和实现,并且适用于大多数Web应用场景。线程模型可以通过多线程来处理并发请求,每个请求分配一个独立的线程进行处理。这样可以充分利用多核CPU的计算能力,提供较高的并发处理能力。
并发处理:Nginx的并发模型主要针对高并发的静态文件服务和反向代理场景进行了优化,通过非阻塞I/O和事件驱动来处理大量的并发连接。而Tomcat和Netty更注重处理复杂的业务逻辑,需要更多的CPU计算能力和线程模型来处理请求。在这种情况下,使用线程池的模型更适合处理复杂的业务逻辑和计算操作。
总的来说,Tomcat和Netty选择了适合自身功能和用途的并发模型。它们更关注复杂的业务逻辑和计算处理,而不仅仅是简单的静态文件服务和反向代理。线程池的模型在处理这种类型的应用场景时更为合适,能够提供较高的灵活性和扩展性。
# HTTP编程
# 1、服务端
HTTP 服务端的选型非常多,从易用性角度,这里只推荐Spring Boot Webflux (opens new window)
其他的实现还有:
# 2、客户端
# HttpURLconnection
在Java 11之前,都是使用HttpURLConnection来做Client:
String turl = "https://www.baidu.com/";
URL url = new URL(turl);
//得到connection对象。
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
//设置请求方式
connection.setRequestMethod("GET");
//连接
connection.connect();
//得到响应码
int responseCode = connection.getResponseCode();
if(responseCode == HttpURLConnection.HTTP_OK){
//得到响应流
InputStream inputStream = connection.getInputStream();
//将响应流转换成字符串
System.out.println(IOUtils.toString(inputStream, StandardCharsets.UTF_8));
}
HttpURLConnection对象比较重,所以一般会采用对象池。
HttpURLConnection API对于处理代理、超时、文件等问题,易用性不高,业界流行的易用性封装是Apache Http Clien (opens new window)
我最常使用的是基于Apache HTTP Client封装的Unirest (opens new window),示例如下:
HttpResponse<JsonNode> response = Unirest.post("http://localhost/post")
.header("accept", "application/json")
.queryString("apiKey", "123")
.field("parameter", "value")
.field("firstname", "Gary")
.asJson();
Unirest默认使用的是Unirest.primaryInstance()实例,如果设置超时、代理、并发,则对于全局生效:
Unirest.config().defaultBaseUrl("http://homestar.com").connectTimeout(10000)
.socketTimeout(30000).proxy("127.0.0.1",8888)
.concurrency(200,10);
可以使用Unirest.spawnInstance()获得一个新实例,也就是获得了一个新的Client对象池。
UnirestInstance newUnirest = Unirest.spawnInstance();
newUnirest.config().defaultBaseUrl("http://homestar.com").connectTimeout(10000)
.socketTimeout(30000).proxy("127.0.0.1",8888)
.concurrency(200,10);
也可以在每个请求上配置超时、代理等:
HttpResponse<JsonNode> response = Unirest.post("http://localhost/post")
.connectTimeout(10000)
.socketTimeout(30000).proxy("127.0.0.1",8888)
.header("accept", "application/json")
.queryString("apiKey", "123")
.field("parameter", "value")
.field("firstname", "Gary")
.asJson();
# Java 11 HTTP Client
Java 11提供了HTTP Client Api,易用性得到了非常大提升:
HttpClient client = HttpClient.newBuilder()
.proxy(ProxySelector.of(new InetSocketAddress("127.0.0.1",8888)))
.connectTimeout(Duration.ofMillis(1))
.version(Version.HTTP_2)
.cookieHandler(new CookieHandler() {
@Override
public Map<String, List<String>> get(URI uri, Map<String, List<String>> requestHeaders) {
return requestHeaders;
}
@Override
public void put(URI uri, Map<String, List<String>> responseHeaders) {
}
})
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://www.baidu.com/"))
.timeout(Duration.ofMillis(2))
.build();
String body = client.send(request, BodyHandlers.ofString()).body();
System.out.println(body);
HttpClient是支持复用和并发的,他内置了一个Executors.newCachedThreadPool()线程池,你也可以自定义线程池:
Executor executor = ...;
HttpClient.newBuilder().executor(executor).build();
参考资料:Java11 HttpClient小试牛刀 (opens new window)
# 声明式HTTP客户端
如果想把HTTP请求绑定到接口上,就像这么做:
public interface MyClient {
@Request(
url = "http://localhost:8080/hello/user",
headers = "Accept: text/plain"
)
String sendRequest(@Query("uname") String username);
}
使用:
// 实例化Forest请求接口
MyClient myClient = Forest.client(MyClient.class);
// 调用Forest请求接口,并获取响应返回结果
String result = myClient.sendRequest("jack");
// 打印响应结果
System.out.println(result);
这就是所谓的声明式Client,可选框架有:
# 总结
本篇文章主要讲了Java Socket编程和Java HTTP编程,对于网络编程这个大领域来说,也是挂一漏万,比如其中对于UDP、WebSocket、gprc协议都没有讲到,对于Netty的使用也没有展开讲。
如果你对网络编程也感兴趣,期待你的批判和补充!