Skip to content

improve send performance faster #1477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions src/main/java/org/java_websocket/WebSocketImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,9 @@ private void decodeFrames(ByteBuffer socketBuffer) {
try {
frames = draft.translateFrame(socketBuffer);
for (Framedata f : frames) {
log.trace("matched frame: {}", f);
if (log.isTraceEnabled()) {
log.trace("matched frame: {}", f);
}
draft.processFrame(this, f);
}
} catch (LimitExceededException e) {
Expand Down Expand Up @@ -671,11 +673,42 @@ private void send(Collection<Framedata> frames) {
if (frames == null) {
throw new IllegalArgumentException();
}
List<ByteBuffer> outgoingFrames = createOutgoingBinaryFrame(frames);
write(outgoingFrames);
}

private List<ByteBuffer> createOutgoingBinaryFrame(Collection<Framedata> frames) {
// most scene send one message, use singletonList which is more fast than singletonList
if (frames.size() == 1) {
for (Framedata f : frames) {
if (log.isTraceEnabled()) {
log.trace("send frame: {}", f);
}
return Collections.singletonList(
draft.createBinaryFrame(f)
);
}
}
ArrayList<ByteBuffer> outgoingFrames = new ArrayList<>();
for (Framedata f : frames) {
log.trace("send frame: {}", f);
if (log.isTraceEnabled()) {
log.trace("send frame: {}", f);
}
outgoingFrames.add(draft.createBinaryFrame(f));
}
return outgoingFrames;
}

public ByteBuffer createEncodedBinaryFrame(Framedata framedata) {
return draft.createBinaryFrame(framedata);
}

public void sendEncodedBinaryFrame(ByteBuffer binaryFrame) {
List<ByteBuffer> outgoingFrames= Collections.singletonList(binaryFrame);
sendEncodedBinaryFrames(outgoingFrames);
}

public void sendEncodedBinaryFrames(List<ByteBuffer> outgoingFrames) {
write(outgoingFrames);
}

Expand Down Expand Up @@ -734,8 +767,11 @@ public void startHandshake(ClientHandshakeBuilder handshakedata)
}

private void write(ByteBuffer buf) {
log.trace("write({}): {}", buf.remaining(),
buf.remaining() > 1000 ? "too big to display" : new String(buf.array()));
// should check isTraceEnabled() to avoid performance down because of log
if (log.isTraceEnabled()) {
log.trace("write({}): {}", buf.remaining(),
buf.remaining() > 1000 ? "too big to display" : new String(buf.array()));
}

outQueue.add(buf);
wsl.onWriteDemand(this);
Expand Down
67 changes: 58 additions & 9 deletions src/main/java/org/java_websocket/drafts/Draft_6455.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
Expand Down Expand Up @@ -509,11 +510,31 @@ private ByteBuffer createByteBufferFromFramedata(Framedata framedata) {
throw new IllegalStateException("Size representation not supported/specified");
}
if (mask) {
ByteBuffer maskkey = ByteBuffer.allocate(4);
maskkey.putInt(reuseableRandom.nextInt());
buf.put(maskkey.array());
for (int i = 0; mes.hasRemaining(); i++) {
buf.put((byte) (mes.get() ^ maskkey.get(i % 4)));
int maskInt = reuseableRandom.nextInt();
if (useFastMask) {
//default ByteOrder.BIG_ENDIAN
ByteBuffer maskLongkey = ByteBuffer.allocate(8);
maskLongkey.putInt(maskInt);
maskLongkey.putInt(maskInt);
buf.putInt(maskInt);
// n / 8 eq n >> 3
int length = mes.remaining() >> 3;
long maskLong = maskLongkey.getLong(0);
for (int i = 0; i < length; i++) {
buf.putLong(mes.getLong() ^ maskLong);
}
for (int i = 0; mes.hasRemaining(); i++) {
// x % 2^n 为 x & (2^n - 1)
buf.put((byte) (mes.get() ^ maskLongkey.get(i & 3)));
}
} else {
ByteBuffer maskkey = ByteBuffer.allocate(4);
maskkey.putInt(maskInt);
buf.put(maskkey.array());
for (int i = 0; mes.hasRemaining(); i++) {
// x % 2^n 为 x & (2^n - 1)
buf.put((byte) (mes.get() ^ maskkey.get(i & 3)));
}
}
} else {
buf.put(mes);
Expand All @@ -525,6 +546,8 @@ private ByteBuffer createByteBufferFromFramedata(Framedata framedata) {
return buf;
}

public static boolean useFastMask = true;

private Framedata translateSingleFrame(ByteBuffer buffer)
throws IncompleteException, InvalidDataException {
if (buffer == null) {
Expand Down Expand Up @@ -556,10 +579,30 @@ private Framedata translateSingleFrame(ByteBuffer buffer)

ByteBuffer payload = ByteBuffer.allocate(checkAlloc(payloadlength));
if (mask) {
byte[] maskskey = new byte[4];
buffer.get(maskskey);
for (int i = 0; i < payloadlength; i++) {
payload.put((byte) (buffer.get(/*payloadstart + i*/) ^ maskskey[i % 4]));
if (useFastMask) {
byte[] maskskey = new byte[4];
buffer.get(maskskey);
ByteBuffer maskLongKey = ByteBuffer.allocate(8);
maskLongKey.put(maskskey);
maskLongKey.put(maskskey);

// n / 8 eq n >> 3
int length = payloadlength >> 3;
long maskLong = maskLongKey.getLong(0);
//fast mask use 8 byte long
for (int i = 0; i < length; i++) {
payload.putLong(buffer.getLong() ^ maskLong);
}
//mask remain bytes
for (int i = (length << 3); i < payloadlength; i++) {
payload.put((byte) (buffer.get(/*payloadstart + i*/) ^ maskskey[i % 4]));
}
} else {
byte[] maskskey = new byte[4];
buffer.get(maskskey);
for (int i = 0; i < payloadlength; i++) {
payload.put((byte) (buffer.get(/*payloadstart + i*/) ^ maskskey[i % 4]));
}
}
} else {
payload.put(buffer.array(), buffer.position(), payload.limit());
Expand Down Expand Up @@ -592,6 +635,11 @@ private Framedata translateSingleFrame(ByteBuffer buffer)
(frame.getPayloadData().remaining() > 1000 ? "too big to display"
: new String(frame.getPayloadData().array())));
}

if (frame instanceof TextFrame) {
((TextFrame) frame).setCanSkipCheckUTF8PlayLoad(true);
}

frame.isValid();
return frame;
}
Expand Down Expand Up @@ -791,6 +839,7 @@ public List<Framedata> createFrames(ByteBuffer binary, boolean mask) {
public List<Framedata> createFrames(String text, boolean mask) {
TextFrame curframe = new TextFrame();
curframe.setPayload(ByteBuffer.wrap(Charsetfunctions.utf8Bytes(text)));
curframe.setCanSkipCheckUTF8PlayLoad(true);
curframe.setTransferemasked(mask);
try {
curframe.isValid();
Expand Down
26 changes: 24 additions & 2 deletions src/main/java/org/java_websocket/framing/TextFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@

package org.java_websocket.framing;

import java.nio.ByteBuffer;
import org.java_websocket.enums.Opcode;
import org.java_websocket.exceptions.InvalidDataException;
import org.java_websocket.util.Charsetfunctions;


/**
* Class to represent a text frames
*/
public class TextFrame extends DataFrame {

boolean canSkipCheckUTF8PlayLoad = false;

/**
* constructor which sets the opcode of this frame to text
*/
Expand All @@ -44,8 +48,26 @@ public TextFrame() {
@Override
public void isValid() throws InvalidDataException {
super.isValid();
if (!Charsetfunctions.isValidUTF8(getPayloadData())) {
throw new InvalidDataException(CloseFrame.NO_UTF8, "Received text is no valid utf8 string!");
if (!canSkipCheckUTF8PlayLoad) {
if (!Charsetfunctions.isValidUTF8(getPayloadData())) {
throw new InvalidDataException(CloseFrame.NO_UTF8, "Received text is no valid utf8 string!");
}
canSkipCheckUTF8PlayLoad = true;
}
}

@Override
public void setPayload(ByteBuffer payload) {
super.setPayload(payload);
canSkipCheckUTF8PlayLoad = false;
}

public boolean hasCheckUTF8PlayLoad() {
return canSkipCheckUTF8PlayLoad;
}

public void setCanSkipCheckUTF8PlayLoad(boolean canSkipCheckUTF8PlayLoad) {
this.canSkipCheckUTF8PlayLoad = canSkipCheckUTF8PlayLoad;
}

}