Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

solve the issue [#80](https://github.com/barchart/barchart-udt/issues/80) #81

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion barchart-udt-core/.classpath
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/java-6-oracle">
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
/**
* {@link ServerSocket} - like wrapper for {@link SocketUDT}
*/
public class NetServerSocketUDT extends ServerSocket implements
IceServerSocket, IceCommon {
public class NetServerSocketUDT extends ServerSocket
implements IceServerSocket, IceCommon {

protected final SocketUDT socketUDT;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ protected NioServerSocketUDT(final ServerSocketChannelUDT channelUDT)

@Override
public Socket accept() throws IOException {
throw new RuntimeException("feature not available");
return channelUDT.accept().socket();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
/**
* Copyright (C) 2009-2013 Barchart, Inc. <http://www.barchart.com/>
*
* All rights reserved. Licensed under the OSI BSD License.
*
* http://www.opensource.org/licenses/bsd-license.php
*/
*
* All rights reserved. Licensed under the OSI BSD License.
*
* http://www.opensource.org/licenses/bsd-license.php
*/
package com.barchart.udt.nio;

import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.AbstractSelectionKey;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,8 +25,8 @@
/**
* UDT selection key implementation.
*/
public class SelectionKeyUDT extends SelectionKey implements
Comparable<SelectionKeyUDT> {
public class SelectionKeyUDT extends AbstractSelectionKey
implements Comparable<SelectionKeyUDT> {

/**
* JDK interest to Epoll READ mapping.
Expand Down Expand Up @@ -93,7 +94,7 @@ public static final String toStringOps(final int selectOps) {
/**
* Key validity state. Key is valid when created, and invalid when canceled.
*/
private volatile boolean isValid;
// private volatile boolean isValid;

/**
* Reported ready interest.
Expand Down Expand Up @@ -140,17 +141,16 @@ protected void assertValidKey() throws CancelledKeyException {
*/
protected void assertValidOps(final int interestOps) {
if ((interestOps & ~(channel().validOps())) != 0) {
throw new IllegalArgumentException("invalid interestOps="
+ interestOps);
throw new IllegalArgumentException(
"invalid interestOps=" + interestOps);
}
}

@Override
public void cancel() {
if (isValid()) {
selector().cancel(this);
}
}
/*
* @Override
*
* public void cancel() { if (isValid()) { selector().cancel(this); } }
*/

@Override
public SelectableChannel channel() {
Expand Down Expand Up @@ -202,6 +202,10 @@ protected boolean doRead(final int resultIndex) {
readyOps = channel().validOps();
return true;
} else {
/**
* in some programe use invoke selectNow to clear canceld
* key
*/
logError("Unexpected error report.");
return false;
}
Expand Down Expand Up @@ -414,10 +418,11 @@ protected boolean isSocketBroken() {
}
}

@Override
public boolean isValid() {
return isValid;
}
/*
* @Override
*
* public boolean isValid() { return isValid; }
*/

/**
* Channel role.
Expand All @@ -427,13 +432,14 @@ protected KindUDT kindUDT() {
}

/**
* Key processing logic error logger.
* Key processing logic error logger. user while use selectNow to clear
* cancel keys
*/
protected void logError(final String comment) {

final String message = "logic error : \n\t" + this;

log.warn(message, new Exception("" + comment));
log.debug(message, new Exception("" + comment));

}

Expand All @@ -451,7 +457,10 @@ protected void makeValid(final boolean isValid) {
} catch (final Throwable e) {
log.error("Epoll failure.", e);
} finally {
this.isValid = isValid;
/*
* if (!isValid) { cancel(); }
*/
// this.isValid = isValid;
}
}

Expand Down Expand Up @@ -486,20 +495,20 @@ protected SocketUDT socketUDT() {
@Override
public String toString() {

return String
.format("[id: 0x%08x] poll=%s ready=%s inter=%s %s %s %s bind=%s:%s peer=%s:%s", //
socketUDT().id(), //
epollOpt, //
toStringOps(readyOps), //
toStringOps(interestOps), //
channelUDT.typeUDT(), //
channelUDT.kindUDT(), //
socketUDT().status(), //
socketUDT().getLocalInetAddress(), //
socketUDT().getLocalInetPort(), //
socketUDT().getRemoteInetAddress(), //
socketUDT().getRemoteInetPort() //
);
return String.format(
"[id: 0x%08x] poll=%s ready=%s inter=%s %s %s %s bind=%s:%s peer=%s:%s", //
socketUDT().id(), //
epollOpt, //
toStringOps(readyOps), //
toStringOps(interestOps), //
channelUDT.typeUDT(), //
channelUDT.kindUDT(), //
socketUDT().status(), //
socketUDT().getLocalInetAddress(), //
socketUDT().getLocalInetPort(), //
socketUDT().getRemoteInetAddress(), //
socketUDT().getRemoteInetPort() //
);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ protected static Selector open(final TypeUDT type) throws IOException {
/**
* Canceled keys.
*/
private final ConcurrentMap<SelectionKeyUDT, SelectionKeyUDT> //
terminatedKeyMap = new ConcurrentHashMap<SelectionKeyUDT, SelectionKeyUDT>();
/*
* private final ConcurrentMap<SelectionKeyUDT, SelectionKeyUDT> //
* terminatedKeyMap = new ConcurrentHashMap<SelectionKeyUDT,
* SelectionKeyUDT>();
*/

/** guarded by {@link #doSelectLocked} */
private volatile int wakeupBaseCount;
Expand All @@ -150,31 +153,25 @@ protected SelectorUDT( //

}

/**
* Enqueue cancel request.
*/
protected void cancel(final SelectionKeyUDT keyUDT) {
terminatedKeyMap.putIfAbsent(keyUDT, keyUDT);
}

/**
* Process pending cancel requests.
*/
protected void doCancel() {

if (terminatedKeyMap.isEmpty()) {
return;
}

final Iterator<SelectionKeyUDT> iterator = terminatedKeyMap.values()
.iterator();

while (iterator.hasNext()) {
final SelectionKeyUDT keyUDT = iterator.next();
iterator.remove();
if (keyUDT.isValid()) {
final Set cks = cancelledKeys();

synchronized (cks) {
if (cks.isEmpty())
return;
final Iterator<SelectionKeyUDT> iterator = cks.iterator();
while (iterator.hasNext()) {
final SelectionKeyUDT keyUDT = iterator.next();
iterator.remove();
// if (keyUDT.isValid()) {
this.deregister(keyUDT);
keyUDT.makeValid(false);
registeredKeyMap.remove(keyUDT.socketId());
// }
}
}

Expand Down Expand Up @@ -210,14 +207,15 @@ protected int doEpollEnter(final long millisTimeout) throws IOException {
* >0 : finite;
* @return
*
* <0 : should not happen
* <0 : should not happen
*
* =0 : means nothing was selected/timeout
*
* >0 : number of selected keys
*/

protected int doEpollExclusive(final long millisTimeout) throws IOException {
protected int doEpollExclusive(final long millisTimeout)
throws IOException {

try {

Expand Down Expand Up @@ -299,7 +297,7 @@ protected int doEpollSelectUDT(final long timeout) throws ExceptionUDT {
writeBuffer, //
sizeBuffer, //
timeout //
);
);
}

protected void doResults() {
Expand Down Expand Up @@ -379,7 +377,7 @@ protected void implCloseSelector() throws IOException {
selectLock.lock();

for (final SelectionKeyUDT keyUDT : registeredKeyMap.values()) {
cancel(keyUDT);
keyUDT.cancel();
}

} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
* assert connectChannel.isConnected();
* </pre>
*/
public class ServerSocketChannelUDT extends ServerSocketChannel implements
ChannelUDT {
public class ServerSocketChannelUDT extends ServerSocketChannel
implements ChannelUDT {

protected static final Logger log = LoggerFactory
.getLogger(ServerSocketChannelUDT.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ public int read(final ByteBuffer buffer) throws IOException {
}

@Override
public long read(final ByteBuffer[] dsts, final int offset, final int length)
throws IOException {
public long read(final ByteBuffer[] dsts, final int offset,
final int length) throws IOException {
throw new RuntimeException("feature not available");
}

Expand Down Expand Up @@ -521,5 +521,4 @@ public SocketChannelUDT bind(final SocketAddress localAddress)
return this;

}

}
3 changes: 1 addition & 2 deletions barchart-udt-core/src/main/patches/UDT4/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@
http://www.opensource.org/licenses/bsd-license.php

-->
patches to udt c++ v 4.X

patches to udt c++ v 4.11 from git
Loading