Skip to content

Commit

Permalink
feat(*): improve performance
Browse files Browse the repository at this point in the history
  1. add second-cache for revoking db
  2. optimize pack trx
  • Loading branch information
halibobo1205 committed Nov 1, 2022
1 parent 2d88f6e commit fd1d0f1
Show file tree
Hide file tree
Showing 16 changed files with 357 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ public void addTransaction(TransactionCapsule pendingTrx) {
getTransactions().add(pendingTrx);
}

public void addAllTransactions(List<TransactionCapsule> pendingTrxs) {
List<Transaction> list = pendingTrxs.stream().map(TransactionCapsule::getInstance).collect(
Collectors.toList());
this.block = this.block.toBuilder().addAllTransactions(list).build();
getTransactions().addAll(pendingTrxs);
}

public List<TransactionCapsule> getTransactions() {
return transactions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.tron.core.capsule;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.tron.core.exception.BadItemException;
Expand Down Expand Up @@ -29,7 +30,7 @@ public TransactionRetCapsule() {

public TransactionRetCapsule(byte[] data) throws BadItemException {
try {
this.transactionRet = transactionRet.parseFrom(data);
this.transactionRet = TransactionRet.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new BadItemException("TransactionInfoCapsule proto data parse exception");
}
Expand All @@ -39,6 +40,10 @@ public void addTransactionInfo(TransactionInfo result) {
this.transactionRet = this.transactionRet.toBuilder().addTransactioninfo(result).build();
}

public void addAllTransactionInfos(List<TransactionInfo> results) {
this.transactionRet = this.transactionRet.toBuilder().addAllTransactioninfo(results).build();
}

@Override
public byte[] getData() {
if (Objects.isNull(transactionRet)) {
Expand Down
2 changes: 1 addition & 1 deletion chainbase/src/main/java/org/tron/core/db2/common/DB.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface DB<K, V> extends Iterable<Map.Entry<K, V>>, Instance<DB<K, V>>

void remove(K k);

Iterator iterator();
Iterator<Map.Entry<K, V>> iterator();

void close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ public abstract class AbstractSnapshot<K, V> implements Snapshot {

protected WeakReference<Snapshot> next;

protected boolean isOptimized;

@Override
public Snapshot advance() {
return new SnapshotImpl(this);
Expand All @@ -36,9 +34,4 @@ public void setNext(Snapshot next) {
public String getDbName() {
return db.getDbName();
}

@Override
public boolean isOptimized(){
return isOptimized;
}
}
2 changes: 0 additions & 2 deletions chainbase/src/main/java/org/tron/core/db2/core/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,4 @@ static boolean isImpl(Snapshot snapshot) {
void updateSolidity();

String getDbName();

boolean isOptimized();
}
14 changes: 0 additions & 14 deletions chainbase/src/main/java/org/tron/core/db2/core/SnapshotImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,6 @@ public class SnapshotImpl extends AbstractSnapshot<Key, Value> {
}
previous = snapshot;
snapshot.setNext(this);
// inherit
isOptimized = snapshot.isOptimized();
// merge for DynamicPropertiesStore,about 100 keys
if (isOptimized) {
if (root == previous ){
Streams.stream(root.iterator()).forEach( e -> put(e.getKey(),e.getValue()));
}else {
merge(previous);
}
}
}

@Override
Expand All @@ -50,10 +40,6 @@ public byte[] get(byte[] key) {
private byte[] get(Snapshot head, byte[] key) {
Snapshot snapshot = head;
Value value;
if (isOptimized) {
value = db.get(Key.of(key));
return value == null ? null: value.getBytes();
}
while (Snapshot.isImpl(snapshot)) {
if ((value = ((SnapshotImpl) snapshot).db.get(Key.of(key))) != null) {
return value.getBytes();
Expand Down
59 changes: 53 additions & 6 deletions chainbase/src/main/java/org/tron/core/db2/core/SnapshotRoot.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package org.tron.core.db2.core;

import ch.qos.logback.core.encoder.ByteArrayUtil;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Getter;
import org.tron.common.cache.CacheManager;
import org.tron.common.cache.TronCache;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.utils.ByteArray;
import org.tron.core.ChainBaseManager;
import org.tron.core.capsule.AccountCapsule;
Expand All @@ -23,11 +26,17 @@ public class SnapshotRoot extends AbstractSnapshot<byte[], byte[]> {
private Snapshot solidity;
private boolean isAccountDB;

private TronCache<WrappedByteArray, WrappedByteArray> cache;
private static final List<String> CACHE_DBS = CommonParameter.getInstance()
.getStorage().getCacheDbs();

public SnapshotRoot(DB<byte[], byte[]> db) {
this.db = db;
solidity = this;
isOptimized = "properties".equalsIgnoreCase(db.getDbName());
isAccountDB = "account".equalsIgnoreCase(db.getDbName());
if (CACHE_DBS.contains(this.db.getDbName())) {
this.cache = CacheManager.allocate(this.db.getDbName());
}
}

private boolean needOptAsset() {
Expand All @@ -37,11 +46,18 @@ private boolean needOptAsset() {

@Override
public byte[] get(byte[] key) {
return db.get(key);
WrappedByteArray cache = getCache(key);
if (cache != null) {
return cache.getBytes();
}
byte[] value = db.get(key);
putCache(key, value);
return value;
}

@Override
public void put(byte[] key, byte[] value) {
byte[] v = value;
if (needOptAsset()) {
if (ByteArray.isEmpty(value)) {
remove(key);
Expand All @@ -56,10 +72,10 @@ public void put(byte[] key, byte[] value) {
}
assetStore.putAccount(item.getInstance());
item.clearAsset();
db.put(key, item.getData());
} else {
db.put(key, value);
v = item.getData();
}
db.put(key, v);
putCache(key, v);
}

@Override
Expand All @@ -68,6 +84,7 @@ public void remove(byte[] key) {
ChainBaseManager.getInstance().getAccountAssetStore().deleteAccount(key);
}
db.remove(key);
putCache(key, null);
}

@Override
Expand All @@ -81,6 +98,7 @@ public void merge(Snapshot from) {
processAccount(batch);
} else {
((Flusher) db).flush(batch);
putCache(batch);
}
}

Expand All @@ -97,6 +115,7 @@ public void merge(List<Snapshot> snapshots) {
processAccount(batch);
} else {
((Flusher) db).flush(batch);
putCache(batch);
}
}

Expand All @@ -120,11 +139,37 @@ private void processAccount(Map<WrappedByteArray, WrappedByteArray> batch) {
}
});
((Flusher) db).flush(accounts);
putCache(accounts);
if (assets.size() > 0) {
assetStore.updateByBatch(AccountAssetStore.convert(assets));
}
}

private boolean cached() {
return Objects.nonNull(this.cache);
}

private void putCache(byte[] key, byte[] value) {
if (cached()) {
cache.put(WrappedByteArray.of(key), WrappedByteArray.of(value));
}
}

private void putCache(Map<WrappedByteArray, WrappedByteArray> values) {
if (cached()) {
values.forEach(cache::put);
}
}

private WrappedByteArray getCache(byte[] key) {
if (cached()) {
return cache.getIfPresent(WrappedByteArray.of(key));
}
return null;
}

// second cache

@Override
public Snapshot retreat() {
return this;
Expand All @@ -142,11 +187,13 @@ public Iterator<Map.Entry<byte[], byte[]>> iterator() {

@Override
public void close() {
CacheManager.release(this.getDbName());
((Flusher) db).close();
}

@Override
public void reset() {
CacheManager.release(this.getDbName());
((Flusher) db).reset();
}

Expand Down
45 changes: 45 additions & 0 deletions common/src/main/java/org/tron/common/cache/CacheManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.tron.common.cache;

import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheStats;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
import org.tron.common.parameter.CommonParameter;

public class CacheManager {

private static final Map<String, TronCache<?, ?>> CACHES = Maps.newConcurrentMap();

public static <K, V> TronCache<K, V> allocate(String name) {
TronCache<K, V> cache = new TronCache<>(name, CommonParameter.getInstance()
.getStorage().getCacheStrategy(name));
CACHES.put(name, cache);
return cache;
}

public static <K, V> TronCache<K, V> allocate(String name, String strategy) {
TronCache<K, V> cache = new TronCache<>(name, strategy);
CACHES.put(name, cache);
return cache;
}

public static <K, V> TronCache<K, V> allocate(String name, String strategy,
CacheLoader<K, V> loader) {
TronCache<K, V> cache = new TronCache<>(name, strategy, loader);
CACHES.put(name, cache);
return cache;
}

public static void release(String name) {
TronCache cache = CACHES.remove(name);
if (cache != null) {
cache.invalidateAll();
}
}

public static Map<String, CacheStats> stats() {
return CACHES.values().stream().collect(Collectors.toMap(TronCache::getName, TronCache::stats));
}

}
66 changes: 66 additions & 0 deletions common/src/main/java/org/tron/common/cache/TronCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.tron.common.cache;

import com.google.common.base.Objects;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheStats;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import lombok.Getter;

public class TronCache<K, V> {

private static final int CPUS = Runtime.getRuntime().availableProcessors();

@Getter
private final String name;
private final Cache<K, V> cache;

TronCache(String name, String strategy) {
this.name = name;
this.cache = CacheBuilder.from(strategy).concurrencyLevel(CPUS).recordStats().build();
}

TronCache(String name, String strategy, CacheLoader<K, V> loader) {
this.name = name;
this.cache = CacheBuilder.from(strategy).concurrencyLevel(CPUS).recordStats().build(loader);
}

public void put(K k, V v) {
this.cache.put(k, v);
}

public V getIfPresent(K k) {
return this.cache.getIfPresent(k);
}

public V get(K k, Callable<? extends V> loader) throws ExecutionException {
return this.cache.get(k, loader);
}

public CacheStats stats() {
return this.cache.stats();
}

public void invalidateAll() {
this.cache.invalidateAll();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TronCache<?, ?> tronCache = (TronCache<?, ?>) o;
return Objects.equal(name, tronCache.name);
}

@Override
public int hashCode() {
return Objects.hashCode(name);
}
}
Loading

0 comments on commit fd1d0f1

Please sign in to comment.