Lettuce源码分析

Posted by CaiJiahe on March 8, 2018

0x01 简介

Lettuce是一个可伸缩线程安全的Redis客户端。多个线程可以共享同一个RedisConnection。它利用Netty框架来高效地管理多个连接。

0x02 向server发送请求

public interface StatefulConnection<K, V> extends AutoCloseable {
	
	...
	
    <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command);

}

public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade {

    private final RedisChannelWriter channelWriter;
	
	...

    protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {

        if (debugEnabled) {
            logger.debug("dispatching command {}", cmd);
        }

        return channelWriter.write(cmd);
    }

}

StatefulConnection接口中定义了dispatch方法,在这个方法的实现抽象类RedisChannelHandler中,该方法调用了channelWriter.write方法。最后这个方法会调到CommandHandler.write方法,然后序列化协议发送给redis server。

// 一个负责响应redis server响应和发送redis server请求的netty ChannelHandler
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {
	...
}

0x03 解析server响应

public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {

    private ByteBuf buffer;

	...

	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	
        ByteBuf input = (ByteBuf) msg;

			...
			
        try {
            buffer.writeBytes(input);

            decode(ctx, buffer);
        } finally {
            input.release();
        }
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
	
		...
		
        while (canDecode(buffer)) {

            RedisCommand<?, ?, ?> command = stack.peek();

            try {
                if (!decode(ctx, buffer, command)) {
                    return;
                }
            } catch (Exception e) {
                ctx.close();
                throw e;
            }

			command.complete();
              
            afterComplete(ctx, command);
        }
    }
	
	...
	
}

从channelRead中得到EventLoop读取到的数据,然后对其解码。

public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {
	
    private final RedisStateMachine rsm = new RedisStateMachine();
	
	...
	
    private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<?, ?, ?> command) {

        if (!decode(buffer, command, command.getOutput())) {

            ...

            return false;
        }

        ...

        return true;
    }
	
    protected boolean decode(ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output) {
        return rsm.decode(buffer, command, output);
    }
	
}

解码的时候会从command里得到相应的Output对象,然后使用RedisStateMachine进行解码。

public class RedisStateMachine {

    static class State {
        enum Type {
            SINGLE, ERROR, INTEGER, BULK, MULTI, BYTES
        }

        Type type = null;
        int count = -1;
    }

    public boolean decode(ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output) {
        int length, end;
        ByteBuffer bytes;

        if (isEmpty(stack)) {
            add(stack, new State());
        }

        if (output == null) {
            return isEmpty(stack);
        }

        loop:

        while (!isEmpty(stack)) {
            State state = peek(stack);

            if (state.type == null) {
                if (!buffer.isReadable()) {
                    break;
                }
                state.type = readReplyType(buffer);
                buffer.markReaderIndex();
            }

            switch (state.type) {
                case SINGLE:
                    if ((bytes = readLine(buffer)) == null) {
                        break loop;
                    }

                    if (!QUEUED.equals(bytes)) {
                        safeSet(output, bytes, command);
                    }
                    break;
                case ERROR:
                    if ((bytes = readLine(buffer)) == null) {
                        break loop;
                    }
                    safeSetError(output, bytes, command);
                    break;
                case INTEGER:
                    if ((end = findLineEnd(buffer)) == -1) {
                        break loop;
                    }
                    long integer = readLong(buffer, buffer.readerIndex(), end);
                    safeSet(output, integer, command);
                    break;
                case BULK:
                    if ((end = findLineEnd(buffer)) == -1) {
                        break loop;
                    }
                    length = (int) readLong(buffer, buffer.readerIndex(), end);
                    if (length == -1) {
                        safeSet(output, null, command);
                    } else {
                        state.type = BYTES;
                        state.count = length + 2;
                        buffer.markReaderIndex();
                        continue loop;
                    }
                    break;
                case MULTI:
                    if (state.count == -1) {
                        if ((end = findLineEnd(buffer)) == -1) {
                            break loop;
                        }
                        length = (int) readLong(buffer, buffer.readerIndex(), end);
                        state.count = length;
                        buffer.markReaderIndex();
                        safeMulti(output, state.count, command);
                    }

                    if (state.count <= 0) {
                        break;
                    }

                    state.count--;
                    addFirst(stack, new State());

                    continue loop;
                case BYTES:
                    if ((bytes = readBytes(buffer, state.count)) == null) {
                        break loop;
                    }
                    safeSet(output, bytes, command);
                    break;
                default:
                    throw new IllegalStateException("State " + state.type + " not supported");
            }

            buffer.markReaderIndex();
            remove(stack);

            output.complete(size(stack));
        }

        if (debugEnabled) {
            logger.debug("Decoded {}, empty stack: {}", command, isEmpty(stack));
        }

        return isEmpty(stack);
    }
	
}

这个方法是用来解析redis server返回来的协议,在解析完成后返回到CommandHandler.decode方法,执行command.complete()方法。

public class CommandWrapper<K, V, T> implements RedisCommand<K, V, T>, CompleteableCommand<T>, DecoratedCommand<K, V, T> {
	
    public void complete() {

        command.complete();

        Consumer[] consumers = ONCOMPLETE.get(this);
        if (consumers != EMPTY && ONCOMPLETE.compareAndSet(this, consumers, EMPTY)) {

            for (Consumer<? super T> consumer : consumers) {
                if (getOutput() != null) {
                    consumer.accept(getOutput().get());
                } else {
                    consumer.accept(null);
                }
            }
        }
    }
	
}

private static class SubscriptionCommand<K, V, T> extends CommandWrapper<K, V, T> implements DemandAware.Sink {

    public void complete() {

		if (completed) {
			return;
		}

		try {
			super.complete();

			if (getOutput() != null) {
				Object result = getOutput().get();

				if (getOutput().hasError()) {
					onError(new RedisCommandExecutionException(getOutput().getError()));
					completed = true;
					return;
				}

				if (!(getOutput() instanceof StreamingOutput<?>) && result != null) {

					if (dissolve && result instanceof Collection) {

						Collection<T> collection = (Collection<T>) result;

						for (T t : collection) {
							if (t != null) {
								subscription.onNext(t);
							}
						}
					} else {
						subscription.onNext((T) result);
					}
				}
			}

			subscription.onAllDataRead();
		} finally {
			completed = true;
		}
	}
}

CommandWrapper.complete()中会调用SubscriptionCommand.complete()方法,SubscriptionCommand.complete方法中会检查Output对象中的result,然后调用Subscription的onNext方法,用来向订阅者推送消息。 最后调用subscription.onAllDataRead()方法推送onComplete事件收尾。