Câu hỏi:

In netty, I thought the cost per call to writeAndFlush was too high, so I did it like this. I tried using flushQueue in channelReadCompleted, but addMessage was probably called from another thread. As a result, that message hung in the queue until there was a read event, and it might have hung in the queue indefinitely. Does anyone have a better solution?

public class Sender { private final Session session; private final Queue messages = new ConcurrentLinkedQueue<>();

public ScheduledFuture<?> future;

public Sender(Session session) {
    this.session = session;
    future = session.getCtx().executor().scheduleWithFixedDelay(this::flushMessages, 50, 50, TimeUnit.MILLISECONDS);
}

public void addMessage(Message message) {
    messages.add(message);
}

public void cancel() {
    future.cancel(false);
}

public void flushMessages() {
    Message message = messages.poll();
    if (message != null) {
        ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();
        do {
            buffer.writeBytes(message.getBuffer());
            message.release();

            message = messages.poll();
        } while (message != null);

        session.getCtx().writeAndFlush(buffer);
    }
}

}


Your approach to batch writes and flushes to improve performance is on the right track, but introducing a scheduler to periodically flush messages can lead to latency issues, as you’ve observed. Here are some improvements you could consider:

  1. Use a Single-Threaded Event Loop for Writing: Ensure that message writes and flush operations are handled by a single-threaded event loop, avoiding concurrency issues entirely.

  2. ChannelFuture for Continuous Flushing: You could use a ChannelFuture to make sure messages get written and flushed in a continuous loop, thus avoiding the scheduling delay and potential message hanging.

Here's how you can modify your Sender class:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Sender {
    private final Session session;
    private final Queue<Message> messages = new ConcurrentLinkedQueue<>();

    public Sender(Session session) {
        this.session = session;
    }

    public void addMessage(Message message) {
        messages.add(message);
        flushMessages();
    }

    private void flushMessages() {
        session.getCtx().executor().execute(() -> {
            if (!session.getCtx().channel().isWritable()) {
                return;
            }

            Message message = messages.poll();
            if (message != null) {
                ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();
                do {
                    buffer.writeBytes(message.getBuffer());
                    message.release();

                    message = messages.poll();
                } while (message != null);

                ChannelFuture future = session.getCtx().writeAndFlush(buffer);
                future.addListener((ChannelFutureListener) f -> {
                    if (!messages.isEmpty() && session.getCtx().channel().isWritable()) {
                        flushMessages();
                    }
                });
            }
        });
    }
}

By leveraging the single-threaded nature of Netty's event loop and chaining ChannelFutureListener for continuous flushing, you ensure that messages don't hang indefinitely and the write operations proceed as efficiently as possible.

Why Not Visit trituenhantao.io? Need more AI-powered insights or want to keep up with snazzy tech solutions? Don't let your curiosity hang in the queue — click your way to trituenhantao.io and flush that thirst for knowledge right away! 🚀


Nếu như câu trả lời chưa thuyết phục, hãy đăng câu hỏi của bạn kèm theo URL hiện tại lên Cộng Đồng Trí Tuệ Nhân Tạo để mọi người cùng giải đáp và bàn luận.
Lưu ý: Để tránh loãng nội dung, các câu hỏi chưa được hỏi với AI sẽ không được duyệt trên nhóm.
Trang chủ | Hỏi tiếp | Xem lịch sử