接上一篇 ServerBootstrap的初始化
https://blog.51cto.com/483181/2119149
创新互联专注于企业营销型网站建设、网站重做改版、双河网站定制设计、自适应品牌网站建设、H5响应式网站、商城网站制作、集团公司官网建设、成都外贸网站建设、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为双河等各大城市提供网站开发制作服务。
先看下调用的源代码
public void bind(int port) throws Exception {
...
try {
...
ChannelFuture f = b.bind(port).sync(); //bind过程
...
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
从上面代码可以看出几点:
继续看doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); //1. init和register
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
上面这一段代码包含的东西就比较多了,先来看 initAndRegister
顾名思义,这个方法包含初始化和注册两个步骤,代码如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
从上面代码,我们可以看到几点:
public B channel(Class channelClass) {
return channelFactory(new ReflectiveChannelFactory(channelClass));
}
public class ReflectiveChannelFactory implements ChannelFactory {
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
}
}
}
它的newChannel方法也是非常的简单,直接实例化传入的channel对象,也就是NioServerSocketChannel (可以看上一篇初始化的分析)
代码如下:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
我们先看看NioServerSocketChannel的实现
先看下NioServerSocketChannel的继承关系
NioServerSocketChannel提供了一个无参构造函数,然后分别有SelectorProvider,ServerSocketChannel的构造函数,如下:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
}
}
private final ServerSocketChannelConfig config;
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
无参构造函数里面调用newSocket(xx),参数是SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
先看看SelectorProvider.provider()
private static SelectorProvider provider = null;
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
...
}
}
可以看到provider是个单例,不知道大家是否记得上上一篇文章(NioEventLoopGroup实例化)分析的时候也有provider,类型是KQueueSelectorProvider
具体可以看: https://blog.51cto.com/483181/2118817
回到newSocket里面,调用的是provider.openServerSocketChannel()
代码是SelectorProviderImpl里面,返回的是 ServerSocketChannel
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
得到ServerSocketChannel之后,继续调用构造函数
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
这个构造方法里面做了两件事
看它的父类构造函数是怎么实现的
首先是AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
}
}
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
然后继续看AbstractNioChannel的父类构造方法,也就是AbstractChannel
private final ChannelId id;
protected abstract AbstractUnsafe newUnsafe();
private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
可以看到这几点:
先看unsafe的初始化
在AbstractChannel里面,它是一个抽象类
protected abstract AbstractUnsafe newUnsafe();
实现类在子类AbstractNioMessageChannel里面,如下,类型是NioMessageUnsafe
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
NioMessageUnsafe代码后面再看。
继续看pipeline的初始化,初始化了一个 DefaultChannelPipeline
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在DefaultChannelPipeline里面初始化了一个head和tail,分别是HeadContext和TailConext类型,而且head和tail组成双向链表。
head和tail的区别之一就是inbound和outbound值是相反的,如下:
节点 | inbound | outbound |
---|---|---|
head | false | true |
tail | true | false |
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
借一张图显示下ChannelInBound和ChannelOutBound,如下。head是发送出去的入口,tail是接收消息的入口。
另外我们来看一下添加一个ChannelHandler的流程,比如addLast
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
...
return this;
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
首先它初始化了一个DefaultChannelHandlerContext对象,里面封装了要add的channelHandler,这个很重要,在Netty的pipeLine里面,都是通过ChannelHandlerContext来描述的,不是直接添加channelHandler。
那,我们来总结下NioServerSocketChannel的初始化过程:
1. NioServerSocketChannel提供了一个无参构造函数,里面SelectorProvider DEFAULT_SELECTOR_PROVIDER,它是一个单例,类型是KQueueSelectorProvider。
2. 我们调用KQueueSelectorProvider.openServerSocketChannel()方法,得到一个ServerSocketChannel
3. 我们用生成的ServerSocketChannel对象创建了一个ServerSocketChannelConfig config,具体是NioServerSocketChannelConfig对象,存在NioServerSocketChannel里面
4. 我们用生成的ServerSocketChannel调用它的父类构造函数,先来到了AbstractNioChannel
5. 在AbstractNioChannel会把ServerSocketChannel存起来,变量是ch,然后把channel设置成非阻塞。
6. AbstractNioChannel还会把readInterestOp存起来,类型是SelectionKey.OP_ACCEPT
7. 继续调用父类构造函数,来到AbstractChannel
8. AbstractChannel里面的parent设置成null
9. AbstractChannel初始化channel id
10. AbstractChannel初始化unsafe,类型是NioMessageUnsafe.
11. AbstractChannel初始化pipeline,类型是DefaultChannelPipeline, 每个Channel都有一个自己的Pipeline
看完NioServerSocketChannel的实例化方法后,我们继续往下看init
abstract void init(Channel channel) throws Exception;
AbstractBootstrap里面的init(channel)方法是一个抽象方法,参数是Channel类型,其实就是上一步实例化好的NioServerSocketChannel对象。
具体实现方法在它的子类ServerBootstrap和Bootstrap(给客户端启动使用的),那我们是分析服务端的代码,所以看ServerBootstrap里面的实现。
void init(Channel channel) throws Exception {
final Map, Object> options = options0();
synchronized (options) { //1. 设置options
setChannelOptions(channel, options, logger);
}
final Map, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry, Object> e: attrs.entrySet()) { //设置attr属性
@SuppressWarnings("unchecked")
AttributeKey