Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net): disconnect from malicious nodes if necessary #5899

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7dbe7c6
initial submit of disconnect zombie node
317787106 Jun 26, 2024
53dab13
format MaliciousFeature
317787106 Jun 26, 2024
1b6ca45
rename to peerNotActiveTime
317787106 Jun 26, 2024
9e2b144
modify default peerNotActiveTime to 600
317787106 Jun 26, 2024
1b198f8
reduce log
317787106 Jun 27, 2024
5a41c62
set type of peerNoBlockTime to seconds
317787106 Jun 27, 2024
4af6102
close resilienceService when TronNetService close
317787106 Jun 27, 2024
d7d5aff
update feature 2
317787106 Jun 27, 2024
d7c0fef
rearrange the close order
317787106 Jun 27, 2024
75f405d
simplfy PeerConnection
317787106 Jun 28, 2024
22c54ed
don't disconnect with active peer if connection is full in case 3
317787106 Jun 28, 2024
7207537
add some log
317787106 Jun 28, 2024
2642b9f
add condition when disconnect
317787106 Jun 28, 2024
edf6934
add feature if enables
317787106 Jun 28, 2024
81b4289
add testcase ResilienceServiceTest
317787106 Jul 1, 2024
6c87144
merge develop
317787106 Jul 1, 2024
f2a1732
rename name of config value
317787106 Jul 2, 2024
406c5c2
schedule to test after 10 seconds
317787106 Jul 3, 2024
c5c3add
only one of feature3 and feature4 is used
317787106 Jul 3, 2024
90af8ef
use same peerNotActiveThreshold for block and inventory
317787106 Jul 3, 2024
828077b
add testcase testCondition1StopInv
317787106 Jul 3, 2024
a57de7a
fix checkstyle and sonar check
317787106 Jul 3, 2024
7836f4c
test pause send inventory
317787106 Jul 4, 2024
24bcaa6
set TEST_PAUSE_INV_SECONDS to constant
317787106 Jul 4, 2024
5bf9ea3
delete config item node.peerNoBlockTime
317787106 Jul 4, 2024
15b3abc
init latestSaveBlockTime in init method
317787106 Jul 9, 2024
369cd92
use setNeedSyncFromPeer,setNeedSyncFromUs method
317787106 Jul 9, 2024
3662502
update some default value of class Feature
317787106 Jul 9, 2024
3a85f98
check if addInv success
317787106 Jul 10, 2024
eafda50
use ReasonCode.BAD_PROTOCOL; noInvBackTime is recoverable
317787106 Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions chainbase/src/main/java/org/tron/core/ChainBaseManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ public class ChainBaseManager {
@Setter
private long lowestBlockNum = -1; // except num = 0.

@Getter
@Setter
private long latestSaveBlockTime = System.currentTimeMillis();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why define this variable? Can it be determined by the header block time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot be determined by the header block time. The variable is used to determine how long ago I save the last block. Let's suppose that i am far from newest block, the header block time is useless in this scene.

// for test only
public List<ByteString> getWitnesses() {
return witnessScheduleStore.getActiveWitnesses();
Expand Down
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dependencies {
api 'org.aspectj:aspectjrt:1.8.13'
api 'org.aspectj:aspectjweaver:1.8.13'
api 'org.aspectj:aspectjtools:1.8.13'
api group: 'io.github.tronprotocol', name: 'libp2p', version: '2.2.1',{
api group: 'com.github.317787106', name: 'libp2p', version: 'v0.0.5',{
exclude group: 'io.grpc', module: 'grpc-context'
exclude group: 'io.grpc', module: 'grpc-core'
exclude group: 'io.grpc', module: 'grpc-netty'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ public class CommonParameter {
public boolean nodeEffectiveCheckEnable;
@Getter
@Setter
public ResilienceConfig resilienceConfig;
@Getter
@Setter
public int nodeConnectionTimeout;
@Getter
@Setter
Expand Down Expand Up @@ -333,6 +336,9 @@ public class CommonParameter {
public boolean isOpenFullTcpDisconnect;
@Getter
@Setter
public int peerNoBlockTime;
317787106 marked this conversation as resolved.
Show resolved Hide resolved
@Getter
@Setter
public boolean nodeDetectEnable;
@Getter
@Setter
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.tron.common.parameter;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "net")
public class ResilienceConfig {
@Getter
@Setter
private boolean enabled = false;

@Getter
@Setter
private int checkInterval = 60;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not need to be defined as a parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specification requires that the value of check interval is configurable.


@Getter
@Setter
private int peerNotActiveThreshold = 600;

@Getter
@Setter
private int blockNotChangeThreshold = 300;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended to use inactive instead, and merge the two parameters into one parameter inactiveThreshold.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have completely different definition. blockNotChangeThreshold is the config parameter that means if latest block has stay unchanged in blockNotChangeThreshold seconds, we think the node is isolated by peers.


@Getter
@Setter
private boolean testStopInv = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only a field in a feature. The feature should be configurable. If the feature is not enabled, defining this field is useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used to specify whether we test if the peer is malicious. There is alternative method. Only one of them can be used. It can be configured in config.conf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a field in the rule. What I mean is that you should configure it according to the rules, not the fields in the rules. When you want to enable or disable rules, you can just change the configuration without changing the code.


@Getter
@Setter
private int disconnectNumber = 1;

}
8 changes: 8 additions & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ public class Constant {
public static final String NODE_DNS_AWS_REGION = "node.dns.awsRegion";
public static final String NODE_DNS_AWS_HOST_ZONE_ID = "node.dns.awsHostZoneId";

public static final String NODE_RESILIENCE_ENABLE = "node.resilience.enable";
public static final String NODE_RESILIENCE_CHECK_INTERVAL = "node.resilience.checkInterval";
public static final String NODE_RESILIENCE_PEER_NOT_ACTIVE_THRESHOLD = "node.resilience.peerNotActiveThreshold";
public static final String NODE_RESILIENCE_BLOCK_NOT_CHANGE_THRESHOLD = "node.resilience.blockNotChangeThreshold";
public static final String NODE_RESILIENCE_TEST_STOP_INV = "node.resilience.testStopInv";
public static final String NODE_RESILIENCE_DISCONNECT_NUMBER = "node.resilience.disconnectNumber";

public static final String NODE_RPC_PORT = "node.rpc.port";
public static final String NODE_RPC_SOLIDITY_PORT = "node.rpc.solidityPort";
public static final String NODE_RPC_PBFT_PORT = "node.rpc.PBFTPort";
Expand Down Expand Up @@ -197,6 +204,7 @@ public class Constant {
public static final String NODE_RECEIVE_TCP_MIN_DATA_LENGTH = "node.receiveTcpMinDataLength";

public static final String NODE_IS_OPEN_FULL_TCP_DISCONNECT = "node.isOpenFullTcpDisconnect";
public static final String NODE_PEER_NO_BLOCK_TIME = "node.peerNoBlockTime";

public static final String NODE_DETECT_ENABLE = "node.nodeDetectEnable";

Expand Down
36 changes: 36 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.tron.common.logsfilter.trigger.ContractLogTrigger;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.parameter.RateLimiterInitialization;
import org.tron.common.parameter.ResilienceConfig;
import org.tron.common.setting.RocksDbSettings;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.Commons;
Expand Down Expand Up @@ -172,6 +173,7 @@ public static void clearParam() {
PARAMETER.estimateEnergyMaxRetry = 3;
PARAMETER.receiveTcpMinDataLength = 2048;
PARAMETER.isOpenFullTcpDisconnect = false;
PARAMETER.peerNoBlockTime = 300;
PARAMETER.nodeDetectEnable = false;
PARAMETER.supportConstant = false;
PARAMETER.debug = false;
Expand Down Expand Up @@ -569,6 +571,8 @@ public static void setParam(final String[] args, final String confFileName) {
config.hasPath(Constant.NODE_EFFECTIVE_CHECK_ENABLE)
&& config.getBoolean(Constant.NODE_EFFECTIVE_CHECK_ENABLE);

PARAMETER.resilienceConfig = loadResilienceConfig(config);

PARAMETER.nodeConnectionTimeout =
config.hasPath(Constant.NODE_CONNECTION_TIMEOUT)
? config.getInt(Constant.NODE_CONNECTION_TIMEOUT) * 1000
Expand Down Expand Up @@ -841,6 +845,8 @@ public static void setParam(final String[] args, final String confFileName) {

PARAMETER.isOpenFullTcpDisconnect = config.hasPath(Constant.NODE_IS_OPEN_FULL_TCP_DISCONNECT)
&& config.getBoolean(Constant.NODE_IS_OPEN_FULL_TCP_DISCONNECT);
PARAMETER.peerNoBlockTime = config.hasPath(Constant.NODE_PEER_NO_BLOCK_TIME)
? config.getInt(Constant.NODE_PEER_NO_BLOCK_TIME) : 300;

PARAMETER.nodeDetectEnable = config.hasPath(Constant.NODE_DETECT_ENABLE)
&& config.getBoolean(Constant.NODE_DETECT_ENABLE);
Expand Down Expand Up @@ -1483,6 +1489,35 @@ private static void logEmptyError(String arg) {
throw new IllegalArgumentException(String.format("Check %s, must not be null or empty", arg));
}

private static ResilienceConfig loadResilienceConfig(final com.typesafe.config.Config config) {
ResilienceConfig resilienceConfig = new ResilienceConfig();
if (config.hasPath(Constant.NODE_RESILIENCE_ENABLE)) {
resilienceConfig.setEnabled(config.getBoolean(Constant.NODE_RESILIENCE_ENABLE));
}
if (resilienceConfig.isEnabled()) {
if (config.hasPath(Constant.NODE_RESILIENCE_CHECK_INTERVAL)) {
resilienceConfig.setCheckInterval(config.getInt(Constant.NODE_RESILIENCE_CHECK_INTERVAL));
}
if (config.hasPath(Constant.NODE_RESILIENCE_PEER_NOT_ACTIVE_THRESHOLD)) {
resilienceConfig.setPeerNotActiveThreshold(
config.getInt(Constant.NODE_RESILIENCE_PEER_NOT_ACTIVE_THRESHOLD));
}
if (config.hasPath(Constant.NODE_RESILIENCE_BLOCK_NOT_CHANGE_THRESHOLD)) {
resilienceConfig.setBlockNotChangeThreshold(
config.getInt(Constant.NODE_RESILIENCE_BLOCK_NOT_CHANGE_THRESHOLD));
}
if (config.hasPath(Constant.NODE_RESILIENCE_TEST_STOP_INV)) {
resilienceConfig.setTestStopInv(config.getBoolean(Constant.NODE_RESILIENCE_TEST_STOP_INV));
}
if (config.hasPath(Constant.NODE_RESILIENCE_DISCONNECT_NUMBER)) {
resilienceConfig.setDisconnectNumber(
config.getInt(Constant.NODE_RESILIENCE_DISCONNECT_NUMBER));
}
}

return resilienceConfig;
}

private static TriggerConfig createTriggerConfig(ConfigObject triggerObject) {
if (Objects.isNull(triggerObject)) {
return null;
Expand Down Expand Up @@ -1650,6 +1685,7 @@ public static void logConfig() {
logger.info("Open full tcp disconnect: {}", parameter.isOpenFullTcpDisconnect());
logger.info("Node detect enable: {}", parameter.isNodeDetectEnable());
logger.info("Node effective check enable: {}", parameter.isNodeEffectiveCheckEnable());
logger.info("Node resilience check enable: {}", parameter.resilienceConfig.isEnabled());
logger.info("Rate limiter global qps: {}", parameter.getRateLimiterGlobalQps());
logger.info("Rate limiter global ip qps: {}", parameter.getRateLimiterGlobalIpQps());
logger.info("Rate limiter global api qps: {}", parameter.getRateLimiterGlobalApiQps());
Expand Down
1 change: 1 addition & 0 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,7 @@ public void updateDynamicProperties(BlockCapsule block) {
(chainBaseManager.getDynamicPropertiesStore().getLatestBlockHeaderNumber()
- chainBaseManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum()
+ 1));
chainBaseManager.setLatestSaveBlockTime(System.currentTimeMillis());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment statement should not be placed in this method, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We verify whether the node is isolated through the latest writing block time to database. If this time stay unchanged over some minutes, it's isolated. Do you have better solution or place?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not deny the timing of assigning this variable, I just feel that the assignment statement is unreasonable in this method, I think you can put it near where this method is called, what do you think?

Copy link
Contributor Author

@317787106 317787106 Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can assign it in ResilienceService where it's used, but it's not accurate. Let's try it.

Metrics.gaugeSet(MetricKeys.Gauge.HEADER_HEIGHT, block.getNum());
Metrics.gaugeSet(MetricKeys.Gauge.HEADER_TIME, block.getTimeStamp());
}
Expand Down
7 changes: 7 additions & 0 deletions framework/src/main/java/org/tron/core/net/TronNetService.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.tron.core.net.peer.PeerStatusCheck;
import org.tron.core.net.service.adv.AdvService;
import org.tron.core.net.service.effective.EffectiveCheckService;
import org.tron.core.net.service.effective.ResilienceService;
import org.tron.core.net.service.fetchblock.FetchBlockService;
import org.tron.core.net.service.nodepersist.NodePersistService;
import org.tron.core.net.service.relay.RelayService;
Expand Down Expand Up @@ -73,6 +74,9 @@ public class TronNetService {
@Autowired
private EffectiveCheckService effectiveCheckService;

@Autowired
private ResilienceService resilienceService;

private volatile boolean init;

private static void setP2pConfig(P2pConfig config) {
Expand All @@ -95,6 +99,7 @@ public void start() {
PeerManager.init();
relayService.init();
effectiveCheckService.init();
resilienceService.init();
logger.info("Net service start successfully");
} catch (Exception e) {
logger.error("Net service start failed", e);
Expand All @@ -113,6 +118,7 @@ public void close() {
peerStatusCheck.close();
transactionsMsgHandler.close();
fetchBlockService.close();
resilienceService.close();
effectiveCheckService.close();
p2pService.close();
relayService.close();
Expand Down Expand Up @@ -178,6 +184,7 @@ private P2pConfig updateConfig(P2pConfig config) {
config.setPort(parameter.getNodeListenPort());
config.setNetworkId(parameter.getNodeP2pVersion());
config.setDisconnectionPolicyEnable(parameter.isOpenFullTcpDisconnect());
config.setNotActiveInterval(parameter.peerNoBlockTime * 1000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does libp2p need this parameter for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The strategy of Random disconnection use this parameter to filter no block peer. If we send or receive some blocks among peerNoBlockTime from this peer, it cannot be disconnect.

config.setNodeDetectEnable(parameter.isNodeDetectEnable());
config.setDiscoverEnable(parameter.isNodeDiscoveryEnable());
if (StringUtils.isEmpty(config.getIp()) && hasIpv4Stack(NetUtil.getAllLocalAddress())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
check(peer, blockMessage);
}

peer.getChannel().setLastActiveTime(System.currentTimeMillis());
if (peer.getSyncBlockRequested().containsKey(blockId)) {
peer.getSyncBlockRequested().remove(blockId);
peer.getSyncBlockInProcess().add(blockId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
Deque<BlockId> blockIdWeGet = new LinkedList<>(chainInventoryMessage.getBlockIds());

if (blockIdWeGet.size() == 1 && tronNetDelegate.containBlock(blockIdWeGet.peek())) {
if (blockIdWeGet.peek().getNum() < peer.getHelloMessageReceive().getSolidBlockId().getNum()) {
peer.getMaliciousFeature().updateBadFeature1();
}
peer.setTronState(TronState.SYNC_COMPLETED);
peer.setNeedSyncFromPeer(false);
peer.updateAdvStartTime();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public void processMessage(PeerConnection peer, TronMessage msg) {
Item item = new Item(id, type);
peer.getAdvInvReceive().put(item, System.currentTimeMillis());
advService.addInv(item);

317787106 marked this conversation as resolved.
Show resolved Hide resolved
if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) {
peer.getMaliciousFeature().setLastRecBlockInvTime(System.currentTimeMillis());
peer.getMaliciousFeature().resetStopBlockInvTime(); //stop test
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
long remainNum = 0;

List<BlockId> summaryChainIds = syncBlockChainMessage.getBlockIds();
if (peer.isNeedSyncFromUs() && summaryChainIds.size() == 1 && summaryChainIds.get(0).getNum()
== peer.getHelloMessageSend().getHeadBlockId().getNum()) {
peer.getMaliciousFeature().updateBadFeature2();
}

BlockId headID = tronNetDelegate.getHeadBlockId();
LinkedList<BlockId> blockIds = getLostBlockIds(summaryChainIds, headID);

Expand All @@ -46,6 +51,7 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
return;
} else if (blockIds.size() == 1) {
peer.setNeedSyncFromUs(false);
peer.updateAdvStartTime();
} else {
peer.setNeedSyncFromUs(true);
remainNum = headID.getNum() - blockIds.peekLast().getNum();
Expand Down
Loading
Loading