Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock all read/writes of selected keys via the public key set #147

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,10 @@ public boolean isValid() {
return !hasOpInvalid() && !cancelled.get() && chann.isOpen() && sel.isOpen();
}

boolean isCancelled() {
return cancelled.get();
}

boolean hasOpInvalid() {
return (opsReady & OP_INVALID) != 0;
}

boolean isSelected() {
return readyOps() != 0;
}

@Override
public void cancel() {
if (!cancelled.compareAndSet(false, true) || !chann.isOpen()) {
Expand All @@ -86,18 +78,6 @@ public void cancel() {
sel.prepareRemove(this);
}

void cancelNoRemove() {
if (!cancelled.compareAndSet(false, true) || !chann.isOpen()) {
return;
}

cancel1();
}

private void cancel1() {
// FIXME
}

@Override
public int interestOps() {
return ops;
Expand Down
158 changes: 87 additions & 71 deletions junixsocket-common/src/main/java/org/newsclub/net/unix/AFSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.channels.spi.AbstractSelector;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -49,7 +51,8 @@ final class AFSelector extends AbstractSelector {

private final Set<SelectionKey> selectedKeysSet = new HashSet<>();
private final Set<SelectionKey> selectedKeysPublic = new UngrowableSet<>(selectedKeysSet);
private final Set<SelectionKey> cancelledKeysSet = new HashSet<>();

private final Deque<SelectionKey> cancelledKeys = new ArrayDeque<>();

private PollFd pollFd = null;

Expand Down Expand Up @@ -113,8 +116,10 @@ private int select0(int timeout) throws IOException {
throw new ClosedSelectorException();
}
pfd = pollFd = initPollFd(pollFd);
performRemove();
selectedKeysSet.clear();
synchronized (selectedKeysPublic) {
performRemove();
selectedKeysPublic.clear();
}
}
int num;
try {
Expand All @@ -124,7 +129,6 @@ private int select0(int timeout) throws IOException {
end();
}
synchronized (this) {
selectedKeysSet.clear();
pfd = pollFd;
if (pfd != null) {
AFSelectionKey[] keys = pfd.keys;
Expand All @@ -139,15 +143,24 @@ private int select0(int timeout) throws IOException {
}
}
}

if (num > 0) {
consumeAllBytesAfterPoll();
setOpsReady(pfd); // updates keysSelected and numKeysSelected
}
return selectedKeysSet.size();

synchronized (selectedKeysPublic) {
selectedKeysPublic.clear();
if (num > 0) {
setOpsReady(pfd); // updates keysSelected and numKeysSelected
}
return selectedKeysPublic.size();
}
}
}

private synchronized void consumeAllBytesAfterPoll() throws IOException {
private void consumeAllBytesAfterPoll() throws IOException {
assert Thread.holdsLock(this);

if (pollFd == null) {
return;
}
Expand Down Expand Up @@ -181,80 +194,86 @@ private synchronized void consumeAllBytesAfterPoll() throws IOException {
}
}

private synchronized void setOpsReady(PollFd pfd) {
private void setOpsReady(PollFd pfd) {
assert Thread.holdsLock(this);
assert Thread.holdsLock(selectedKeysPublic);

if (pfd != null) {
for (int i = 1; i < pfd.rops.length; i++) {
int rops = pfd.rops[i];
AFSelectionKey key = pfd.keys[i];
key.setOpsReady(rops);
if (rops != 0 && key.isValid()) {
if (rops != 0) {
selectedKeysSet.add(key);
}
if (!key.isValid()) {
key.cancel();
}
}
}
}

@SuppressWarnings({"resource", "PMD.CognitiveComplexity"})
private PollFd initPollFd(PollFd existingPollFd) throws IOException {
synchronized (this) {
for (Iterator<AFSelectionKey> it = keysRegisteredKeySet.iterator(); it.hasNext();) {
AFSelectionKey key = it.next();
if (!key.getAFCore().fd.valid() || key.hasOpInvalid()) {
key.cancelNoRemove();
it.remove();
existingPollFd = null;
} else {
key.setOpsReady(0);
}
assert Thread.holdsLock(this);

for (Iterator<AFSelectionKey> it = keysRegisteredKeySet.iterator(); it.hasNext();) {
AFSelectionKey key = it.next();
if (!key.getAFCore().fd.valid() || !key.isValid()) {
key.cancel();
it.remove();
existingPollFd = null;
} else {
key.setOpsReady(0);
}
}

if (existingPollFd != null && //
existingPollFd.keys != null && //
(existingPollFd.keys.length - 1) == keysRegistered.size()) {
boolean needsUpdate = false;
int i = 1;
for (AFSelectionKey key : keysRegisteredKeySet) {
if (existingPollFd.keys[i] != key || !key.isValid()) { // NOPMD
needsUpdate = true;
break;
}
existingPollFd.ops[i] = key.interestOps();

i++;
if (existingPollFd != null && //
existingPollFd.keys != null && //
(existingPollFd.keys.length - 1) == keysRegistered.size()) {
boolean needsUpdate = false;
int i = 1;
for (AFSelectionKey key : keysRegisteredKeySet) {
if (existingPollFd.keys[i] != key || !key.isValid()) { // NOPMD
needsUpdate = true;
break;
}
existingPollFd.ops[i] = key.interestOps();

if (!needsUpdate) {
return existingPollFd;
}
i++;
}

int keysToPoll = keysRegistered.size();
for (AFSelectionKey key : keysRegisteredKeySet) {
if (!key.isValid()) {
keysToPoll--;
}
if (!needsUpdate) {
return existingPollFd;
}
}

int size = keysToPoll + 1;
FileDescriptor[] fds = new FileDescriptor[size];
int[] ops = new int[size];
int keysToPoll = keysRegistered.size();
for (AFSelectionKey key : keysRegisteredKeySet) {
if (!key.isValid()) {
keysToPoll--;
}
}

AFSelectionKey[] keys = new AFSelectionKey[size];
fds[0] = selectorPipe.sourceFD();
ops[0] = SelectionKey.OP_READ;
int size = keysToPoll + 1;
FileDescriptor[] fds = new FileDescriptor[size];
int[] ops = new int[size];

int i = 1;
for (AFSelectionKey key : keysRegisteredKeySet) {
if (!key.isValid()) {
continue;
}
keys[i] = key;
fds[i] = key.getAFCore().fd;
ops[i] = key.interestOps();
i++;
AFSelectionKey[] keys = new AFSelectionKey[size];
fds[0] = selectorPipe.sourceFD();
ops[0] = SelectionKey.OP_READ;

int i = 1;
for (AFSelectionKey key : keysRegisteredKeySet) {
if (!key.isValid()) {
continue;
}
return new PollFd(keys, fds, ops);
keys[i] = key;
fds[i] = key.getAFCore().fd;
ops[i] = key.interestOps();
i++;
}
return new PollFd(keys, fds, ops);
}

@Override
Expand All @@ -263,11 +282,11 @@ protected void implCloseSelector() throws IOException {
Set<SelectionKey> keys;
synchronized (this) {
keys = keys();
for (SelectionKey key : keys) {
key.cancel();
}
keysRegistered.clear();
}
for (SelectionKey key : keys) {
((AFSelectionKey) key).cancelNoRemove();
}
selectorPipe.close();
}

Expand Down Expand Up @@ -295,26 +314,23 @@ public Selector wakeup() {
return this;
}

synchronized void remove(AFSelectionKey key) {
selectedKeysSet.remove(key);
deregister(key);
pollFd = null;
}

void prepareRemove(AFSelectionKey key) {
synchronized (cancelledKeysSet) {
cancelledKeysSet.add(key);
synchronized (cancelledKeys) {
cancelledKeys.addLast(key);
}
}

void performRemove() {
synchronized (cancelledKeysSet) {
for (SelectionKey key : cancelledKeysSet) {
selectedKeysSet.remove(key);
assert Thread.holdsLock(this);
assert Thread.holdsLock(selectedKeysPublic);

synchronized (cancelledKeys) {
SelectionKey key;
while ((key = cancelledKeys.pollFirst()) != null) {
selectedKeysPublic.remove(key);
deregister((AFSelectionKey) key);
pollFd = null;
}
cancelledKeysSet.clear();
}
}

Expand Down