diff --git a/internal/job/airdrop/airdrop.go b/internal/job/airdrop/airdrop.go index 1981387..d7634cd 100644 --- a/internal/job/airdrop/airdrop.go +++ b/internal/job/airdrop/airdrop.go @@ -189,10 +189,12 @@ func (a *Airdrop) buildTxs(tx *model.NhAirdropLog, seqNum aptos.SequenceNumber) return rx.SignedTransaction(a.sender) } -func (a *Airdrop) processTxs(txs []*model.NhAirdropLog, isRetry bool) { +func (a *Airdrop) processTxs(txs []*model.NhAirdropLog) { + if len(txs) == 0 { + return + } var signedTransactions []*aptos.SignedTransaction var txIds []uint - logx.Debugw("build transactions", logx.Field("count", len(txs)), logx.Field("isRetry", isRetry)) ac, err := a.client.Account(a.sender.AccountAddress()) if err != nil { logx.Errorw("get account info error", logx.Field("err", err)) @@ -216,7 +218,7 @@ func (a *Airdrop) processTxs(txs []*model.NhAirdropLog, isRetry bool) { if hash, err := btx.Hash(); err == nil { txHash = hash } - if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, txHash, isRetry) { + if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, txHash, tx.Status == model.AirdropStatusFailed.String()) { logx.Infow("set sent error", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String())) continue } @@ -260,24 +262,34 @@ func (a *Airdrop) checkTx(t *model.NhAirdropLog) { } func (a *Airdrop) Start() { - newImportTk := time.NewTicker(time.Second * 10) + tkDuration := time.Second * 15 + txTk := time.NewTicker(tkDuration) checkTk := time.NewTicker(time.Second * 13) - retryTk := time.NewTicker(time.Second * 17) for { select { case <-a.ctx.Done(): return - case <-newImportTk.C: - txs, err := a.svcCtx.AirdropModel.FindAirdropNewImportList(a.ctx, 100) + case <-txTk.C: + var airdropList []*model.NhAirdropLog + + // 新导入的交易 + txs1, err := a.svcCtx.AirdropModel.FindAirdropNewImportList(a.ctx, 100) if err != nil { logx.Errorw("find airdrop list error", logx.Field("err", err)) - continue + } else if len(txs1) > 0 { + airdropList = append(airdropList, txs1...) } - if len(txs) == 0 { - logx.Debugw("find airdrop list empty", logx.Field("count", len(txs))) - continue + + // 失败需要重发的交易 + txs2, err := a.svcCtx.AirdropModel.FindAirdropFailedList(a.ctx, 20) + if err != nil { + logx.Errorw("find airdrop filed list error", logx.Field("err", err)) + } else if len(txs2) > 0 { + airdropList = append(airdropList, txs2...) } - a.processTxs(txs, false) + a.processTxs(airdropList) + txTk.Reset(tkDuration) + case <-checkTk.C: txs, err := a.svcCtx.AirdropModel.FindRequireCheckList(a.ctx, 100) if err != nil { @@ -287,17 +299,6 @@ func (a *Airdrop) Start() { for _, tx := range txs { a.checkTx(tx) } - case <-retryTk.C: - txs, err := a.svcCtx.AirdropModel.FindAirdropFailedList(a.ctx, 100) - if err != nil { - logx.Errorw("find airdrop filed list error", logx.Field("err", err)) - continue - } - if len(txs) == 0 { - logx.Debugw("find airdrop filed list empty", logx.Field("count", len(txs))) - continue - } - a.processTxs(txs, true) } } }