这个其实补一下之前写的东西,大概内容就是jdk.net+jdk.nio
该类与socket并非有任何血缘关系
第一次看的时候,也会感觉:这个类维护的东西有点多,有点杂
package com.example.angelmicroservicenetty.nio.nio;import cn.angel.project.ddd.util.SequenceUtil;
import com.example.angelmicroservicenetty.nio.Stub;
import com.google.common.base.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** @author weng* @since 2023/3/4*/public class TcpServerStub2 implements Stub {@Overridepublic void service() throws IOException {Selector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// socket 绑定端口 -> 监听serverSocketChannel.socket().bind(new InetSocketAddress(PORT));// 通道 注册 -> selector// 设置非阻塞serverSocketChannel.configureBlocking(Boolean.FALSE).register(selector, SelectionKey.OP_ACCEPT);while (true) {int readyChannels = selector.select();if (NumberUtils.INTEGER_ZERO == readyChannels) {continue;}Set readyKeys = selector.selectedKeys();Iterator iterator = readyKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();iterator.remove();if (selectionKey.isAcceptable()) {// 建立连接,设置客户端通道非阻塞,注册到selector// 客户端注册到selectorSocketChannel clientSocketChannel = serverSocketChannel.accept();clientSocketChannel.configureBlocking(Boolean.FALSE).register(selector, SelectionKey.OP_READ);} else if (selectionKey.isReadable()) {SocketChannel clientSocketChannel = (SocketChannel) selectionKey.channel();ByteBuffer rdBuf = ByteBuffer.allocate(UNIT);int num = clientSocketChannel.read(rdBuf);if (num > 0) {rdBuf.flip();byte[] bytes = new byte[num];rdBuf.get(bytes);String dataString = new String(bytes, Charsets.UTF_8);String sequence = SequenceUtil.format("收到客户端请求:[{}]", dataString);System.err.println(sequence);// write阻塞ByteBuffer wrBuf = ByteBuffer.wrap(sequence.getBytes(Charsets.UTF_8));clientSocketChannel.write(wrBuf);}// 连接已关闭else if (num == -1) {IOUtils.closeQuietly(clientSocketChannel);}}}}}
}
package com.example.angelmicroservicenetty.nio.nio;import cn.angel.project.ddd.util.SequenceUtil;
import com.example.angelmicroservicenetty.nio.Stub;
import com.google.common.base.Charsets;
import org.apache.commons.io.IOUtils;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;/*** @author weng* @since 2023/3/4*/public class TcpClientStub implements Stub {@Overridepublic void service() throws IOException {SocketChannel clientSocketChannel = SocketChannel.open();clientSocketChannel.connect(new InetSocketAddress(HOST, PORT));DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateString = dateFormat.format(new Date());ByteBuffer wrBuf = ByteBuffer.wrap(dateString.getBytes(StandardCharsets.UTF_8));clientSocketChannel.write(wrBuf);ByteBuffer rdBuf = ByteBuffer.allocate(UNIT);int num;// read阻塞,等待数据读取if ((num = clientSocketChannel.read(rdBuf)) > 0) {rdBuf.flip();byte[] bytes = new byte[num];rdBuf.get(bytes);String rdString = new String(bytes, Charsets.UTF_8);String sequence = SequenceUtil.format("服务端响应:[{}]", rdString);System.err.println(sequence);}IOUtils.closeQuietly(clientSocketChannel);}
}
package com.example.angelmicroservicenetty.nio.aio;import cn.angel.project.ddd.support.json.Jsons;
import cn.angel.project.ddd.util.SequenceUtil;
import com.example.angelmicroservicenetty.nio.Stub;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;/*** @author weng* @since 2023/3/5*/// 所有操作系统都不支持对文件I/O的非阻塞模式
// TODO Future、CompletionHandler 两种模式的底层
public class TcpServerStub implements Stub {private static final AcceptCompletionHandler HANDLER_ACCEPT;private static final RdCompletionHandler HANDLER_READ;static {HANDLER_ACCEPT = new AcceptCompletionHandler();HANDLER_READ = new RdCompletionHandler();}@Overridepublic void service() throws IOException {// 服务端监听socket的channelAsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT));// 参数StubAttachment attachment = StubAttachment.builder().serverSocketChannel(serverSocketChannel).build();// 触发一下对 客户端通道 的连接serverSocketChannel.accept(attachment, HANDLER_ACCEPT);// 防止main线程退出(回顾并发可知:即native wait(0ms))// 语义:当前线程加入到别的线程中(等待别的活跃线程0ms,切换上下文)// 从抛出的InterruptedException也可得知try {Thread.currentThread().join();} catch (InterruptedException e) {e.printStackTrace();}}private static class AcceptCompletionHandler implements CompletionHandler {@Overridepublic void completed(AsynchronousSocketChannel clientSocketChannel, StubAttachment attachment) {try {SocketAddress clientAddress = clientSocketChannel.getRemoteAddress();System.out.println("收到客户端de连接:" + clientAddress);// 接受客户端连接之后,服务端应该调用accept()重新等待后续新连接的到来attachment.getServerSocketChannel().accept(attachment, this);// 构建新的的参数StubAttachment newAttachment = StubAttachment.builder().serverSocketChannel(attachment.getServerSocketChannel()).clientSocketChannel(clientSocketChannel).isReadMode(Boolean.TRUE).buf(ByteBuffer.allocate(UNIT)).build();// 触发一下对 客户端通道 的读clientSocketChannel.read(newAttachment.getBuf(), newAttachment, HANDLER_READ);} catch (IOException ex) {ex.printStackTrace();}}@Overridepublic void failed(Throwable exc, StubAttachment attachment) {System.err.println(obtainFailText("AcceptCompletionHandler", exc, attachment));}}private static class RdCompletionHandler implements CompletionHandler {@Overridepublic void completed(Integer result, StubAttachment attachment) {// 就看这个result的作用了System.err.println("RdCompletionHandler.completed.result="+result);if (attachment.isReadMode()) {ByteBuffer buf = attachment.getBuf();buf.flip();byte[] bytes = new byte[buf.limit()];buf.get(bytes);String msg = new String(bytes, StandardCharsets.UTF_8).trim();String seq = SequenceUtil.format("收到客户端的请求:[{}]", msg);System.err.println(seq);buf.clear();buf.put("服务端的响应".getBytes(StandardCharsets.UTF_8));buf.flip();// 将消息异步的写回客户端attachment.setReadMode(Boolean.FALSE);attachment.getClientSocketChannel().write(buf, attachment, this);}// 此时,往客户端写回数据也结束了else {// 继续从客户端读取新的请求
// attachment.setReadMode(Boolean.TRUE);
// attachment.getBuf().clear();
// attachment.getClientSocketChannel().read(attachment.getBuf(), attachment, this);// 另一个选择:结束连接try {attachment.getClientSocketChannel().close();} catch (IOException e) {e.printStackTrace();}}}@Overridepublic void failed(Throwable exc, StubAttachment attachment) {System.err.println(obtainFailText("RdCompletionHandler", exc, attachment));}}private static String obtainFailText(String prefix, Throwable exc, StubAttachment attachment) {return SequenceUtil.format(prefix + ".failed , exc:[{}], att:[{}]",exc.getMessage(),Jsons.NO_OP.stringify(attachment));}
}
package com.example.angelmicroservicenetty.nio.aio;import cn.angel.project.ddd.support.json.Jsons;
import cn.angel.project.ddd.util.SequenceUtil;
import com.example.angelmicroservicenetty.nio.Stub;
import lombok.SneakyThrows;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Future;/*** @author weng* @since 2023/3/5*/public class TcpClientStub implements Stub {private static final CompletionHandler HANDLER_RD_WR;static {HANDLER_RD_WR = new RdWrCompletionHandler();}@SneakyThrows@Overridepublic void service() throws IOException {AsynchronousSocketChannel clientSocketChannel = AsynchronousSocketChannel.open();// Future模式(用户代码自己来维护状态)// clientSocketChannel.connect(new InetSocketAddress(PORT), StubAttachment.builder().build(), new CompletionHandler() {});Future connFuture = clientSocketChannel.connect(new InetSocketAddress(HOST, PORT));// 阻塞至连接完成connFuture.get();StubAttachment attachment = StubAttachment.builder().isReadMode(Boolean.FALSE).clientSocketChannel(clientSocketChannel).buf(ByteBuffer.allocate(UNIT)).build();// 写到服务端的请求消息DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateString = dateFormat.format(new Date());attachment.getBuf().put(dateString.getBytes(StandardCharsets.UTF_8)).flip();// 当然,这里也是异步写到服务端的clientSocketChannel.write(attachment.getBuf(), attachment, HANDLER_RD_WR);Thread.currentThread().join();}private static class RdWrCompletionHandler implements CompletionHandler {@Overridepublic void completed(Integer result, StubAttachment attachment) {System.err.println("RdWrCompletionHandler.completed.result=" + result);// 读取服务端写回来的数据ByteBuffer buf = attachment.getBuf();if (attachment.isReadMode()) {buf.flip();byte[] bytes = new byte[buf.limit()];buf.get(bytes);String msg = new String(bytes, StandardCharsets.UTF_8);System.err.println("收到服务端响应的数据=" + msg);// 可以向服务端发送新的数据try {// 也可以索性关闭客户端通道的连接attachment.getClientSocketChannel().close();} catch (IOException e) {e.printStackTrace();}}else {// 写回调buf.flip();byte[] bytes = new byte[buf.limit()];buf.get(bytes);String dataString = new String(bytes, StandardCharsets.UTF_8);System.err.println("往服务端写入请求="+dataString);buf.clear();attachment.setReadMode(Boolean.TRUE);attachment.getClientSocketChannel().read(buf, attachment, this);}}@Overridepublic void failed(Throwable exc, StubAttachment attachment) {String failedText = SequenceUtil.format("RdWrCompletionHandler.failed, exc:[{}], attachment:[{}]", exc.getMessage(), Jsons.NO_OP.stringify(attachment));System.err.println(failedText);}}
}