Files
novatask/internal/job/airdrop/airdrop.go
2025-02-21 18:27:21 +08:00

311 lines
8.9 KiB
Go

package airdrop
import (
"context"
"errors"
"github.com/aptos-labs/aptos-go-sdk"
"github.com/aptos-labs/aptos-go-sdk/bcs"
"github.com/aptos-labs/aptos-go-sdk/crypto"
"github.com/shopspring/decimal"
"github.com/zeromicro/go-zero/core/logx"
"nova_task/internal/model"
"nova_task/internal/svc"
"time"
)
type Airdrop struct {
ctx context.Context
cancel context.CancelFunc
svcCtx *svc.ServiceContext
client *aptos.Client
sender *aptos.Account
//payloads chan aptos.TransactionBuildPayload
//results chan aptos.TransactionSubmissionResponse
isTest bool
}
func NewAirdrop(svcCtx *svc.ServiceContext) (*Airdrop, error) {
if svcCtx.Config.AptosConf.PrivateKey == "" {
return nil, errors.New("aptos private key is empty")
}
var networkConfig aptos.NetworkConfig
if svcCtx.Config.AptosConf.IsTest {
networkConfig = aptos.TestnetConfig
} else {
networkConfig = aptos.MainnetConfig
}
client, err := aptos.NewClient(networkConfig)
if err != nil {
return nil, err
}
privateKey := &crypto.Ed25519PrivateKey{}
err = privateKey.FromHex(svcCtx.Config.AptosConf.PrivateKey)
if err != nil {
return nil, err
}
sender, err := aptos.NewAccountFromSigner(privateKey)
if err != nil {
return nil, err
}
//payloads := make(chan aptos.TransactionBuildPayload, 100)
//results := make(chan aptos.TransactionSubmissionResponse, 100)
//
//threading.GoSafe(func() {
// client.BuildSignAndSubmitTransactions(sender, payloads, results)
//})
ctx, cancel := context.WithCancel(context.Background())
a := &Airdrop{
ctx: ctx,
svcCtx: svcCtx,
cancel: cancel,
client: client,
sender: sender,
//payloads: payloads,
//results: results,
isTest: svcCtx.Config.AptosConf.IsTest,
}
//threading.GoSafe(func() {
// for result := range results {
// a.handleResult(result)
// }
//})
return a, nil
}
func (a *Airdrop) CoinTransferPayload(toAddress string, amount decimal.Decimal) (*aptos.EntryFunction, error) {
receiver := aptos.AccountAddress{}
err := receiver.ParseStringRelaxed(toAddress)
if err != nil {
return nil, err
}
amountUint64 := uint64(amount.Mul(decimal.NewFromInt(1000000)).IntPart())
if a.isTest {
contractAddress := aptos.AccountAddress{}
err = contractAddress.ParseStringRelaxed("0x43417434fd869edee76cca2a4d2301e528a1551b1d719b75c350c3c97d15b8b9")
if err != nil {
return nil, err
}
coinType := &aptos.TypeTag{
Value: &aptos.StructTag{
Address: contractAddress,
Module: "coins",
Name: "USDT",
TypeParams: []aptos.TypeTag{}, // USDT 没有额外的类型参数
},
}
return aptos.CoinTransferPayload(coinType, receiver, amountUint64)
}
amountBytes, err := bcs.SerializeU64(amountUint64)
if err != nil {
return nil, err
}
coinType := aptos.TypeTag{
Value: &aptos.StructTag{
Address: aptos.AccountOne,
Module: "fungible_asset",
Name: "Metadata",
TypeParams: []aptos.TypeTag{}, // USDT 没有额外的类型参数
},
}
return &aptos.EntryFunction{
Module: aptos.ModuleId{
Address: aptos.AccountOne,
Name: "primary_fungible_store",
},
Function: "transfer",
ArgTypes: []aptos.TypeTag{coinType},
Args: [][]byte{
receiver[:],
amountBytes,
},
}, nil
}
//func (a *Airdrop) transferUsdt(id uint64, toAddress string, amount decimal.Decimal) error {
// p, err := a.CoinTransferPayload(toAddress, amount)
// if err != nil {
// return err
// }
//
// a.payloads <- aptos.TransactionBuildPayload{
// Id: id,
// Type: aptos.TransactionSubmissionTypeSingle,
// Inner: aptos.TransactionPayload{Payload: p},
// }
// result := <-a.results
// result.Id = id
// a.handleResult(result)
// return nil
//}
//func (a *Airdrop) handleResult(result aptos.TransactionSubmissionResponse) {
// if result.Err != nil {
// logx.Errorw("submit transaction error", logx.Field("err", result.Err), logx.Field("id", result.Id))
// var txHash string
// if result.Response != nil {
// txHash = result.Response.Hash
// }
// a.svcCtx.AirdropModel.SetFailed(a.ctx, uint(result.Id), txHash)
// return
// }
// if result.Response != nil {
// logx.Infow("submit transaction success", logx.Field("id", result.Id), logx.Field("txHash", result.Response.Hash))
// a.svcCtx.AirdropModel.SetTxHash(a.ctx, uint(result.Id), result.Response.Hash)
// }
//}
//func (a *Airdrop) processTx(tx *model.NhAirdropLog, isRetry bool) {
// if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, "", isRetry) {
// return
// }
// err := a.transferUsdt(uint64(tx.Id), tx.Address, tx.Amount)
// if err != nil {
// logx.Errorw("transfer usdt error", logx.Field("err", err), logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
// a.svcCtx.AirdropModel.SetIllegal(a.ctx, tx.Id)
// } else {
// logx.Infow("submit transaction success", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
// }
//}
func (a *Airdrop) buildTxs(tx *model.NhAirdropLog, seqNum aptos.SequenceNumber) (*aptos.SignedTransaction, error) {
p, err := a.CoinTransferPayload(tx.Address, tx.Amount)
if err != nil {
return nil, err
}
rx, err := a.client.BuildTransaction(a.sender.AccountAddress(), aptos.TransactionPayload{Payload: p}, seqNum)
if err != nil {
return nil, err
}
return rx.SignedTransaction(a.sender)
}
func (a *Airdrop) processTxs(txs []*model.NhAirdropLog) {
if len(txs) == 0 {
return
}
var signedTransactions []*aptos.SignedTransaction
var txIds []uint
ac, err := a.client.Account(a.sender.AccountAddress())
if err != nil {
logx.Errorw("get account info error", logx.Field("err", err))
return
}
seqNo, err := ac.SequenceNumber()
if err != nil {
logx.Errorw("get account sequence number error", logx.Field("err", err))
return
}
seqNo--
for _, tx := range txs {
seqNo++
btx, err := a.buildTxs(tx, aptos.SequenceNumber(seqNo))
if err != nil {
logx.Errorw("build transaction error", logx.Field("err", err), logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
a.svcCtx.AirdropModel.SetIllegal(a.ctx, tx.Id)
continue
}
var txHash string
if hash, err := btx.Hash(); err == nil {
txHash = hash
}
if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, txHash, tx.Status == model.AirdropStatusFailed.String()) {
logx.Infow("set sent error", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
continue
}
signedTransactions = append(signedTransactions, btx)
txIds = append(txIds, tx.Id)
logx.Debugw("build transaction success", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
}
faileds, err := a.client.BatchSubmitTransaction(signedTransactions)
if err != nil {
logx.Errorw("submit transaction error", logx.Field("err", err))
return
}
for _, tf := range faileds.TransactionFailures {
ix := signedTransactions[int(tf.TransactionIndex)]
var txHash string
if hash, err := ix.Hash(); err == nil {
txHash = hash
}
id := txIds[int(tf.TransactionIndex)]
logx.Errorw("submit transaction error", logx.Field("err", tf.Error), logx.Field("id", id), logx.Field("txHash", txHash))
a.svcCtx.AirdropModel.SetFailed(a.ctx, id, txHash)
}
}
func (a *Airdrop) checkTx(t *model.NhAirdropLog) {
tx, err := a.client.TransactionByHash(t.TxHash)
if err != nil {
logx.Errorw("get transaction by hash error", logx.Field("err", err), logx.Field("hash", t.TxHash), logx.Field("id", t.Id))
return
}
success := tx.Success()
if success != nil && *success {
logx.Infow("transaction success", logx.Field("hash", t.TxHash), logx.Field("id", t.Id), logx.Field("address", t.Address), logx.Field("amount", t.Amount.String()))
a.svcCtx.AirdropModel.SetSuccess(a.ctx, t.Id)
} else {
logx.Infow("transaction not success", logx.Field("hash", t.TxHash), logx.Field("id", t.Id))
}
}
func (a *Airdrop) Start() {
tkDuration := time.Second * 15
txTk := time.NewTicker(tkDuration)
checkTk := time.NewTicker(time.Second * 13)
for {
select {
case <-a.ctx.Done():
return
case <-txTk.C:
var airdropList []*model.NhAirdropLog
// 新导入的交易
txs1, err := a.svcCtx.AirdropModel.FindAirdropNewImportList(a.ctx, 100)
if err != nil {
logx.Errorw("find airdrop list error", logx.Field("err", err))
} else if len(txs1) > 0 {
airdropList = append(airdropList, txs1...)
}
// 失败需要重发的交易
txs2, err := a.svcCtx.AirdropModel.FindAirdropFailedList(a.ctx, 20)
if err != nil {
logx.Errorw("find airdrop filed list error", logx.Field("err", err))
} else if len(txs2) > 0 {
airdropList = append(airdropList, txs2...)
}
a.processTxs(airdropList)
txTk.Reset(tkDuration)
case <-checkTk.C:
txs, err := a.svcCtx.AirdropModel.FindRequireCheckList(a.ctx, 100)
if err != nil {
logx.Errorw("find airdrop list error", logx.Field("err", err))
continue
}
for _, tx := range txs {
a.checkTx(tx)
}
}
}
}
func (a *Airdrop) Stop() {
a.cancel()
//close(a.payloads)
//close(a.results)
}