Skip to content

Commit

Permalink
Optimize heartbeat and reconnect task. (apache#2658)
Browse files Browse the repository at this point in the history
* Optimize heartbeat and reconnect task.
1.Use hashedWheelTimer.
2.Distinguish between reconnect and heartbeat.
3.Increase inspection cycle.

* fix ci fail.

* fix ci fail.

* fix ci fail.
  • Loading branch information
carryxyh authored and CrazyHZM committed Dec 6, 2018
1 parent a400ada commit b26495d
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 182 deletions.
10 changes: 10 additions & 0 deletions dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;

/**
* Constants
*/
Expand Down Expand Up @@ -246,6 +247,15 @@ public class Constants {

public static final String HEARTBEAT_KEY = "heartbeat";

public static final int HEARTBEAT_TICK = 3;

public static final long LEAST_HEARTBEAT_TICK = 1000;

/**
* ticks per wheel. Currently only contains two tasks, so 16 locations are enough
*/
public static final int TICKS_PER_WHEEL = 16;

public static final String HEARTBEAT_TIMEOUT_KEY = "heartbeat.timeout";

public static final String CONNECT_TIMEOUT_KEY = "connect.timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,11 @@ public Set<Timeout> stop() {
return worker.unprocessedTimeouts();
}

@Override
public boolean isStop() {
return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
}

@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ public interface Timer {
* this method
*/
Set<Timeout> stop();

/**
* the timer is stop
*
* @return true for stop
*/
boolean isStop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.remoting.exchange.support.header;

import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.remoting.Channel;

import java.util.Collection;
import java.util.concurrent.TimeUnit;

/**
* AbstractTimerTask
*/
public abstract class AbstractTimerTask implements TimerTask {

protected final ChannelProvider channelProvider;

protected final Long tick;

protected AbstractTimerTask(ChannelProvider channelProvider, Long tick) {
if (channelProvider == null || tick == null) {
throw new IllegalArgumentException();
}
this.tick = tick;
this.channelProvider = channelProvider;
}

protected Long lastRead(Channel channel) {
return (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
}

protected Long lastWrite(Channel channel) {
return (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
}

protected Long now() {
return System.currentTimeMillis();
}

protected void reput(Timeout timeout, Long tick) {
if (timeout == null || tick == null) {
throw new IllegalArgumentException();
}
Timer timer = timeout.timer();
if (timer.isStop()) {
return;
}
if (timeout.isCancelled()) {
return;
}
timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}

@Override
public void run(Timeout timeout) throws Exception {
Collection<Channel> c = channelProvider.getChannels();
for (Channel channel : c) {
if (channel.isClosed()) {
continue;
}
doTask(channel);
}
reput(timeout, tick);
}

protected abstract void doTask(Channel channel);

interface ChannelProvider {
Collection<Channel> getChannels();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Client;
import org.apache.dubbo.remoting.RemotingException;
Expand All @@ -31,28 +28,22 @@
import org.apache.dubbo.remoting.exchange.ResponseFuture;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* DefaultMessageClient
*/
public class HeaderExchangeClient implements ExchangeClient {

private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeClient.class);

private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
private final Client client;
private final ExchangeChannel channel;
// heartbeat timer
private ScheduledFuture<?> heartbeatTimer;
// heartbeat(ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;

private HashedWheelTimer heartbeatTimer;

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
Expand All @@ -65,7 +56,12 @@ public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}

if (needHeartbeat) {
long heartbeatTick = calcLeastTick(heartbeat);

// use heartbeatTick as every tick.
heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}
}
Expand Down Expand Up @@ -181,37 +177,40 @@ public boolean hasAttribute(String key) {
}

private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);

long heartbeatTick = calcLeastTick(heartbeat);
long heartbeatTimeoutTick = calcLeastTick(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);

// init task and start timer.
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

private void stopHeartbeatTimer() {
if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
try {
heartbeatTimer.cancel(true);
scheduled.purge();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(e.getMessage(), e);
}
}
if (heartbeatTimer != null) {
heartbeatTimer.stop();
heartbeatTimer = null;
}
heartbeatTimer = null;
}

private void doClose() {
stopHeartbeatTimer();
}

/**
* Each interval cannot be less than 1000ms.
*/
private long calcLeastTick(int time) {
if (time / Constants.HEARTBEAT_TICK <= 0) {
return Constants.LEAST_HEARTBEAT_TICK;
} else {
return time / Constants.HEARTBEAT_TICK;
}
}

@Override
public String toString() {
return "HeaderExchangeClient [channel=" + channel + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
Expand All @@ -34,9 +34,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -47,18 +44,14 @@ public class HeaderExchangeServer implements ExchangeServer {

protected final Logger logger = LoggerFactory.getLogger(getClass());

private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
new NamedThreadFactory(
"dubbo-remoting-server-heartbeat",
true));
private final Server server;
// heartbeat timer
private ScheduledFuture<?> heartbeatTimer;
// heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);

private HashedWheelTimer heartbeatTimer;

public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
Expand All @@ -69,6 +62,10 @@ public HeaderExchangeServer(Server server) {
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}

long heartbeatTick = calcLeastTick(heartbeat);
// use heartbeatTick as every tick.
heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}

Expand Down Expand Up @@ -153,11 +150,6 @@ private void doClose() {
return;
}
stopHeartbeatTimer();
try {
scheduled.shutdown();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}

@Override
Expand Down Expand Up @@ -223,6 +215,12 @@ public void reset(URL url) {
if (h != heartbeat || t != heartbeatTimeout) {
heartbeat = h;
heartbeatTimeout = t;

stopHeartbeatTimer();

long heartbeatTick = calcLeastTick(heartbeat);
// use heartbeatTick as every tick.
heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}
}
Expand Down Expand Up @@ -254,29 +252,32 @@ public void send(Object message, boolean sent) throws RemotingException {
}

private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels());
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
AbstractTimerTask.ChannelProvider cp = () -> Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());

long heartbeatTick = calcLeastTick(heartbeat);
long heartbeatTimeoutTick = calcLeastTick(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);

// init task and start timer.
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

/**
* Each interval cannot be less than 1000ms.
*/
private long calcLeastTick(int time) {
if (time / Constants.HEARTBEAT_TICK <= 0) {
return Constants.LEAST_HEARTBEAT_TICK;
} else {
return time / Constants.HEARTBEAT_TICK;
}
}

private void stopHeartbeatTimer() {
try {
ScheduledFuture<?> timer = heartbeatTimer;
if (timer != null && !timer.isCancelled()) {
timer.cancel(true);
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
} finally {
if (heartbeatTimer != null) {
heartbeatTimer.stop();
heartbeatTimer = null;
}
}
Expand Down
Loading

0 comments on commit b26495d

Please sign in to comment.