modify airdrop
This commit is contained in:
@@ -189,10 +189,12 @@ func (a *Airdrop) buildTxs(tx *model.NhAirdropLog, seqNum aptos.SequenceNumber)
|
||||
return rx.SignedTransaction(a.sender)
|
||||
}
|
||||
|
||||
func (a *Airdrop) processTxs(txs []*model.NhAirdropLog, isRetry bool) {
|
||||
func (a *Airdrop) processTxs(txs []*model.NhAirdropLog) {
|
||||
if len(txs) == 0 {
|
||||
return
|
||||
}
|
||||
var signedTransactions []*aptos.SignedTransaction
|
||||
var txIds []uint
|
||||
logx.Debugw("build transactions", logx.Field("count", len(txs)), logx.Field("isRetry", isRetry))
|
||||
ac, err := a.client.Account(a.sender.AccountAddress())
|
||||
if err != nil {
|
||||
logx.Errorw("get account info error", logx.Field("err", err))
|
||||
@@ -216,7 +218,7 @@ func (a *Airdrop) processTxs(txs []*model.NhAirdropLog, isRetry bool) {
|
||||
if hash, err := btx.Hash(); err == nil {
|
||||
txHash = hash
|
||||
}
|
||||
if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, txHash, isRetry) {
|
||||
if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, txHash, tx.Status == model.AirdropStatusFailed.String()) {
|
||||
logx.Infow("set sent error", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
|
||||
continue
|
||||
}
|
||||
@@ -260,24 +262,34 @@ func (a *Airdrop) checkTx(t *model.NhAirdropLog) {
|
||||
}
|
||||
|
||||
func (a *Airdrop) Start() {
|
||||
newImportTk := time.NewTicker(time.Second * 10)
|
||||
tkDuration := time.Second * 15
|
||||
txTk := time.NewTicker(tkDuration)
|
||||
checkTk := time.NewTicker(time.Second * 13)
|
||||
retryTk := time.NewTicker(time.Second * 17)
|
||||
for {
|
||||
select {
|
||||
case <-a.ctx.Done():
|
||||
return
|
||||
case <-newImportTk.C:
|
||||
txs, err := a.svcCtx.AirdropModel.FindAirdropNewImportList(a.ctx, 100)
|
||||
case <-txTk.C:
|
||||
var airdropList []*model.NhAirdropLog
|
||||
|
||||
// 新导入的交易
|
||||
txs1, err := a.svcCtx.AirdropModel.FindAirdropNewImportList(a.ctx, 100)
|
||||
if err != nil {
|
||||
logx.Errorw("find airdrop list error", logx.Field("err", err))
|
||||
continue
|
||||
} else if len(txs1) > 0 {
|
||||
airdropList = append(airdropList, txs1...)
|
||||
}
|
||||
if len(txs) == 0 {
|
||||
logx.Debugw("find airdrop list empty", logx.Field("count", len(txs)))
|
||||
continue
|
||||
|
||||
// 失败需要重发的交易
|
||||
txs2, err := a.svcCtx.AirdropModel.FindAirdropFailedList(a.ctx, 20)
|
||||
if err != nil {
|
||||
logx.Errorw("find airdrop filed list error", logx.Field("err", err))
|
||||
} else if len(txs2) > 0 {
|
||||
airdropList = append(airdropList, txs2...)
|
||||
}
|
||||
a.processTxs(txs, false)
|
||||
a.processTxs(airdropList)
|
||||
txTk.Reset(tkDuration)
|
||||
|
||||
case <-checkTk.C:
|
||||
txs, err := a.svcCtx.AirdropModel.FindRequireCheckList(a.ctx, 100)
|
||||
if err != nil {
|
||||
@@ -287,17 +299,6 @@ func (a *Airdrop) Start() {
|
||||
for _, tx := range txs {
|
||||
a.checkTx(tx)
|
||||
}
|
||||
case <-retryTk.C:
|
||||
txs, err := a.svcCtx.AirdropModel.FindAirdropFailedList(a.ctx, 100)
|
||||
if err != nil {
|
||||
logx.Errorw("find airdrop filed list error", logx.Field("err", err))
|
||||
continue
|
||||
}
|
||||
if len(txs) == 0 {
|
||||
logx.Debugw("find airdrop filed list empty", logx.Field("count", len(txs)))
|
||||
continue
|
||||
}
|
||||
a.processTxs(txs, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user