Skip to content

Commit

Permalink
Lock all read/writes of selected keys via the public key set
Browse files Browse the repository at this point in the history
This ensures that the selector and its clients use the same lock to
synchronize access to the selected keys

Changes:
* Always lock on `selectedKeysPublic` to be consistent with clients
* Synchronize on `selectedKeysPublic` when adding or clearing selected
  keys
* Reorder the calls to `consumeAllBytesAfterPoll` and
  `selectedKeysPublic.clear()` after polling in order to push down the
  lock on `selectedKeysPublic`. This should be safe since neither the
  channel closing loop nor `consumeAllBytesAfterPoll` operates on the
  selected key set
* Remove redudnant use of `synchronized` on methods that are always
  called under a lock on `this`. Use assertions to document what locks
  are required for the given method instead
  • Loading branch information
ThePumpingLemma committed Nov 9, 2023
1 parent 6818639 commit 51636c7
Showing 1 changed file with 71 additions and 55 deletions.
126 changes: 71 additions & 55 deletions junixsocket-common/src/main/java/org/newsclub/net/unix/AFSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ private int select0(int timeout) throws IOException {
throw new ClosedSelectorException();
}
pfd = pollFd = initPollFd(pollFd);
performRemove();
selectedKeysSet.clear();
synchronized (selectedKeysPublic) {
performRemove();
selectedKeysPublic.clear();
}
}
int num;
try {
Expand All @@ -127,7 +129,6 @@ private int select0(int timeout) throws IOException {
end();
}
synchronized (this) {
selectedKeysSet.clear();
pfd = pollFd;
if (pfd != null) {
AFSelectionKey[] keys = pfd.keys;
Expand All @@ -142,15 +143,24 @@ private int select0(int timeout) throws IOException {
}
}
}

if (num > 0) {
consumeAllBytesAfterPoll();
setOpsReady(pfd); // updates keysSelected and numKeysSelected
}
return selectedKeysSet.size();

synchronized (selectedKeysPublic) {
selectedKeysPublic.clear();
if (num > 0) {
setOpsReady(pfd); // updates keysSelected and numKeysSelected
}
return selectedKeysPublic.size();
}
}
}

private synchronized void consumeAllBytesAfterPoll() throws IOException {
private void consumeAllBytesAfterPoll() throws IOException {
assert Thread.holdsLock(this);

if (pollFd == null) {
return;
}
Expand Down Expand Up @@ -184,7 +194,10 @@ private synchronized void consumeAllBytesAfterPoll() throws IOException {
}
}

private synchronized void setOpsReady(PollFd pfd) {
private void setOpsReady(PollFd pfd) {
assert Thread.holdsLock(this);
assert Thread.holdsLock(selectedKeysPublic);

if (pfd != null) {
for (int i = 1; i < pfd.rops.length; i++) {
int rops = pfd.rops[i];
Expand All @@ -202,65 +215,65 @@ private synchronized void setOpsReady(PollFd pfd) {

@SuppressWarnings({"resource", "PMD.CognitiveComplexity"})
private PollFd initPollFd(PollFd existingPollFd) throws IOException {
synchronized (this) {
for (Iterator<AFSelectionKey> it = keysRegisteredKeySet.iterator(); it.hasNext();) {
AFSelectionKey key = it.next();
if (!key.getAFCore().fd.valid() || !key.isValid()) {
key.cancel();
it.remove();
existingPollFd = null;
} else {
key.setOpsReady(0);
}
}
assert Thread.holdsLock(this);

if (existingPollFd != null && //
existingPollFd.keys != null && //
(existingPollFd.keys.length - 1) == keysRegistered.size()) {
boolean needsUpdate = false;
int i = 1;
for (AFSelectionKey key : keysRegisteredKeySet) {
if (existingPollFd.keys[i] != key || !key.isValid()) { // NOPMD
needsUpdate = true;
break;
}
existingPollFd.ops[i] = key.interestOps();
for (Iterator<AFSelectionKey> it = keysRegisteredKeySet.iterator(); it.hasNext();) {
AFSelectionKey key = it.next();
if (!key.getAFCore().fd.valid() || !key.isValid()) {
key.cancel();
it.remove();
existingPollFd = null;
} else {
key.setOpsReady(0);
}
}

i++;
if (existingPollFd != null && //
existingPollFd.keys != null && //
(existingPollFd.keys.length - 1) == keysRegistered.size()) {
boolean needsUpdate = false;
int i = 1;
for (AFSelectionKey key : keysRegisteredKeySet) {
if (existingPollFd.keys[i] != key || !key.isValid()) { // NOPMD
needsUpdate = true;
break;
}
existingPollFd.ops[i] = key.interestOps();

if (!needsUpdate) {
return existingPollFd;
}
i++;
}

int keysToPoll = keysRegistered.size();
for (AFSelectionKey key : keysRegisteredKeySet) {
if (!key.isValid()) {
keysToPoll--;
}
if (!needsUpdate) {
return existingPollFd;
}
}

int keysToPoll = keysRegistered.size();
for (AFSelectionKey key : keysRegisteredKeySet) {
if (!key.isValid()) {
keysToPoll--;
}
}

int size = keysToPoll + 1;
FileDescriptor[] fds = new FileDescriptor[size];
int[] ops = new int[size];
int size = keysToPoll + 1;
FileDescriptor[] fds = new FileDescriptor[size];
int[] ops = new int[size];

AFSelectionKey[] keys = new AFSelectionKey[size];
fds[0] = selectorPipe.sourceFD();
ops[0] = SelectionKey.OP_READ;
AFSelectionKey[] keys = new AFSelectionKey[size];
fds[0] = selectorPipe.sourceFD();
ops[0] = SelectionKey.OP_READ;

int i = 1;
for (AFSelectionKey key : keysRegisteredKeySet) {
if (!key.isValid()) {
continue;
}
keys[i] = key;
fds[i] = key.getAFCore().fd;
ops[i] = key.interestOps();
i++;
int i = 1;
for (AFSelectionKey key : keysRegisteredKeySet) {
if (!key.isValid()) {
continue;
}
return new PollFd(keys, fds, ops);
keys[i] = key;
fds[i] = key.getAFCore().fd;
ops[i] = key.interestOps();
i++;
}
return new PollFd(keys, fds, ops);
}

@Override
Expand Down Expand Up @@ -308,10 +321,13 @@ void prepareRemove(AFSelectionKey key) {
}

void performRemove() {
assert Thread.holdsLock(this);
assert Thread.holdsLock(selectedKeysPublic);

synchronized (cancelledKeys) {
SelectionKey key;
while ((key = cancelledKeys.pollFirst()) != null) {
selectedKeysSet.remove(key);
selectedKeysPublic.remove(key);
deregister((AFSelectionKey) key);
pollFd = null;
}
Expand Down

0 comments on commit 51636c7

Please sign in to comment.