Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Untimely event consumption may cause node OOM #5721

Closed
317787106 opened this issue Feb 8, 2024 · 4 comments
Closed

Untimely event consumption may cause node OOM #5721

317787106 opened this issue Feb 8, 2024 · 4 comments
Assignees
Labels
topic: Event subscribe transaction trigger, block trigger, contract event, contract log type:bug

Comments

@317787106
Copy link
Contributor

Rationale

Java-tron can load event plug-ins through configuration files, currently including Mongo plug-ins and Kafka plug-ins. Plug-in implementation refers to https://github.com/tronprotocol/event-plugin. The node consumes events and writes them to Mongo through plug-in serialization or streaming to Kafka.

All events are cached through BlockingQueue:

 private BlockingQueue<TriggerCapsule> triggerCapsuleQueue;

There are multiple producers, such as org.tron.core.db.Manager#postTransactionTrigger, which writes the log in the transaction to the queue:

 private void postTransactionTrigger(final TransactionCapsule trxCap,
      final BlockCapsule blockCap) {
    TransactionLogTriggerCapsule trx = new TransactionLogTriggerCapsule(trxCap, blockCap);
    trx.setLatestSolidifiedBlockNumber(getDynamicPropertiesStore()
        .getLatestSolidifiedBlockNum());
    if (!triggerCapsuleQueue.offer(trx)) {
      logger.info("Too many triggers, transaction trigger lost: {}.", trxCap.getTransactionId());
    }
  }

But there is only one consumer: org.tron.core.db.Manager#triggerCapsuleProcessLoop:

  private Runnable triggerCapsuleProcessLoop =
      () -> {
        while (isRunTriggerCapsuleProcessThread) {
          try {
            TriggerCapsule triggerCapsule = triggerCapsuleQueue.poll(1, TimeUnit.SECONDS);
            if (triggerCapsule != null) {
              triggerCapsule.processTrigger();
            }
          } catch (InterruptedException ex) {
            logger.info(ex.getMessage());
            Thread.currentThread().interrupt();
          } catch (Throwable throwable) {
            logger.error("Unknown throwable happened in process capsule loop.", throwable);
          }
        }
      };

ProcessTrigger actually serializes events through the 7 APIs of the plug-in IPluginEventListener. If the consumption speed of consumers is much slower than that of producers, the queue may be backlogged. After a while, the node will experience frequent full gc, be unable to synchronize or provide external services, or even run out of memory and incur OOM, eventually leading to data loss.

Possible reasons for slow queue data consumption include:

  1. There is not sufficient bandwidth between fullnode and mongo server.
  2. Mongo does not have any filed index.
  3. Mongo’s unique index is not set correctly.

Implementation

One possible way is to set the maximum and minimum threshold of the queue’s length. Start a monitoring thread, when the queue’s length exceeds the maximum value, this thread will suspend synchronization or broadcasting block, and timely remind users to deal with queue overflow problems; when the length is below the minimum value, it resumes synchronization.

@jwrct
Copy link
Contributor

jwrct commented Feb 9, 2024

Does this issue only exist in the nodes where the event service is enabled? Will data loss occur if it is not optimized?

@317787106
Copy link
Contributor Author

Does this issue only exist in the nodes where the event service is enabled? Will data loss occur if it is not optimized?

@jwrct Event service includes native queue and event plugin. It doesn't exist in native queue, namely zmq, but exist in mongo and kafka plugin. Data loss may occur in above case.

@317787106
Copy link
Contributor Author

In manager.class, create a new thread that monitor capsule queue:

boolean isRunMonitorCapsuleQueueThread = true;
...
private Runnable monitorCapsuleQueueLoop =
    () -> {
      int MAX_QUEUE_SIZE = 1000; //1000 is an example.
      while (isRunMonitorCapsuleQueueThread) {
        try {
          if (triggerCapsuleQueue.size() > MAX_QUEUE_SIZE) {
            Synchronized(tronNetDelegate.getBlockLock()) {
              logger.error("Size of triggerCapsuleQueue is too big {} > {}",
                  triggerCapsuleQueue.size(), MAX_QUEUE_SIZE);
              Thread.sleep(2000);
            }
          } else {
            Thread.sleep(2000);
          }
        } catch (InterruptedException ex) {
          logger.info(ex.getMessage());
          Thread.currentThread().interrupt();
        } catch (Throwable throwable) {
          logger.error("Unknown throwable happened in monitor capsule queue loop.", throwable);
        }
      }
    };

If the queue' size is too large, we will pause synchronizeing the block. Else it will resume.

@halibobo1205 halibobo1205 added the topic: Event subscribe transaction trigger, block trigger, contract event, contract log label Feb 28, 2024
@317787106
Copy link
Contributor Author

317787106 commented Mar 12, 2024

Another possible solution is to set a threshold MAX_QUEUE_SIZE (suppose 1000), when the length of the queue triggerCapsuleQueue is greater than MAX_QUEUE_SIZE, block processing is suspended; if it is less than MAX_QUEUE_SIZE, block processing is resumed.

public void pushBlock(final BlockCapsule block)
      throws ValidateSignatureException, ContractValidateException, ContractExeException,
      UnLinkedBlockException, ValidateScheduleException, AccountResourceInsufficientException,
      TaposException, TooBigTransactionException, TooBigTransactionResultException,
      DupTransactionException, TransactionExpirationException,
      BadNumberBlockException, BadBlockException, NonCommonBlockException,
      ReceiptCheckErrException, VMIllegalException, ZksnarkException, EventBloomException {
    while(triggerCapsuleQueue.size() > MAX_QUEUE_SIZE) { 
      logger.error("Size of triggerCapsuleQueue is too big {} > {}, please check if event plugin works",
                  triggerCapsuleQueue.size(), MAX_QUEUE_SIZE);
      Thread.sleep(2000);
    }
    setBlockWaitLock(true);
    ...
}

If the queue is too large, sleep 2000 ms, block processing is paused . The value of MAX_QUEUE_SIZE comes from the number of events in 200 blocks within 10 minutes and needs to be actually measured further.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic: Event subscribe transaction trigger, block trigger, contract event, contract log type:bug
Projects
None yet
Development

No branches or pull requests

3 participants