package airdrop import ( "context" "errors" "github.com/aptos-labs/aptos-go-sdk" "github.com/aptos-labs/aptos-go-sdk/bcs" "github.com/aptos-labs/aptos-go-sdk/crypto" "github.com/shopspring/decimal" "github.com/zeromicro/go-zero/core/logx" "nova_task/internal/model" "nova_task/internal/svc" "time" ) type Airdrop struct { ctx context.Context cancel context.CancelFunc svcCtx *svc.ServiceContext client *aptos.Client sender *aptos.Account //payloads chan aptos.TransactionBuildPayload //results chan aptos.TransactionSubmissionResponse isTest bool } func NewAirdrop(svcCtx *svc.ServiceContext) (*Airdrop, error) { if svcCtx.Config.AptosConf.PrivateKey == "" { return nil, errors.New("aptos private key is empty") } var networkConfig aptos.NetworkConfig if svcCtx.Config.AptosConf.IsTest { networkConfig = aptos.TestnetConfig } else { networkConfig = aptos.MainnetConfig } client, err := aptos.NewClient(networkConfig) if err != nil { return nil, err } privateKey := &crypto.Ed25519PrivateKey{} err = privateKey.FromHex(svcCtx.Config.AptosConf.PrivateKey) if err != nil { return nil, err } sender, err := aptos.NewAccountFromSigner(privateKey) if err != nil { return nil, err } //payloads := make(chan aptos.TransactionBuildPayload, 100) //results := make(chan aptos.TransactionSubmissionResponse, 100) // //threading.GoSafe(func() { // client.BuildSignAndSubmitTransactions(sender, payloads, results) //}) ctx, cancel := context.WithCancel(context.Background()) a := &Airdrop{ ctx: ctx, svcCtx: svcCtx, cancel: cancel, client: client, sender: sender, //payloads: payloads, //results: results, isTest: svcCtx.Config.AptosConf.IsTest, } //threading.GoSafe(func() { // for result := range results { // a.handleResult(result) // } //}) return a, nil } func (a *Airdrop) CoinTransferPayload(toAddress string, amount decimal.Decimal) (*aptos.EntryFunction, error) { receiver := aptos.AccountAddress{} err := receiver.ParseStringRelaxed(toAddress) if err != nil { return nil, err } amountUint64 := uint64(amount.Mul(decimal.NewFromInt(1000000)).IntPart()) if a.isTest { contractAddress := aptos.AccountAddress{} err = contractAddress.ParseStringRelaxed("0x43417434fd869edee76cca2a4d2301e528a1551b1d719b75c350c3c97d15b8b9") if err != nil { return nil, err } coinType := &aptos.TypeTag{ Value: &aptos.StructTag{ Address: contractAddress, Module: "coins", Name: "USDT", TypeParams: []aptos.TypeTag{}, // USDT 没有额外的类型参数 }, } return aptos.CoinTransferPayload(coinType, receiver, amountUint64) } amountBytes, err := bcs.SerializeU64(amountUint64) if err != nil { return nil, err } coinType := aptos.TypeTag{ Value: &aptos.StructTag{ Address: aptos.AccountOne, Module: "fungible_asset", Name: "Metadata", TypeParams: []aptos.TypeTag{}, // USDT 没有额外的类型参数 }, } return &aptos.EntryFunction{ Module: aptos.ModuleId{ Address: aptos.AccountOne, Name: "primary_fungible_store", }, Function: "transfer", ArgTypes: []aptos.TypeTag{coinType}, Args: [][]byte{ receiver[:], amountBytes, }, }, nil } //func (a *Airdrop) transferUsdt(id uint64, toAddress string, amount decimal.Decimal) error { // p, err := a.CoinTransferPayload(toAddress, amount) // if err != nil { // return err // } // // a.payloads <- aptos.TransactionBuildPayload{ // Id: id, // Type: aptos.TransactionSubmissionTypeSingle, // Inner: aptos.TransactionPayload{Payload: p}, // } // result := <-a.results // result.Id = id // a.handleResult(result) // return nil //} //func (a *Airdrop) handleResult(result aptos.TransactionSubmissionResponse) { // if result.Err != nil { // logx.Errorw("submit transaction error", logx.Field("err", result.Err), logx.Field("id", result.Id)) // var txHash string // if result.Response != nil { // txHash = result.Response.Hash // } // a.svcCtx.AirdropModel.SetFailed(a.ctx, uint(result.Id), txHash) // return // } // if result.Response != nil { // logx.Infow("submit transaction success", logx.Field("id", result.Id), logx.Field("txHash", result.Response.Hash)) // a.svcCtx.AirdropModel.SetTxHash(a.ctx, uint(result.Id), result.Response.Hash) // } //} //func (a *Airdrop) processTx(tx *model.NhAirdropLog, isRetry bool) { // if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, "", isRetry) { // return // } // err := a.transferUsdt(uint64(tx.Id), tx.Address, tx.Amount) // if err != nil { // logx.Errorw("transfer usdt error", logx.Field("err", err), logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String())) // a.svcCtx.AirdropModel.SetIllegal(a.ctx, tx.Id) // } else { // logx.Infow("submit transaction success", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String())) // } //} func (a *Airdrop) buildTxs(tx *model.NhAirdropLog, seqNum aptos.SequenceNumber) (*aptos.SignedTransaction, error) { p, err := a.CoinTransferPayload(tx.Address, tx.Amount) if err != nil { return nil, err } rx, err := a.client.BuildTransaction(a.sender.AccountAddress(), aptos.TransactionPayload{Payload: p}, seqNum) if err != nil { return nil, err } return rx.SignedTransaction(a.sender) } func (a *Airdrop) processTxs(txs []*model.NhAirdropLog) { if len(txs) == 0 { return } var signedTransactions []*aptos.SignedTransaction var txIds []uint ac, err := a.client.Account(a.sender.AccountAddress()) if err != nil { logx.Errorw("get account info error", logx.Field("err", err)) return } seqNo, err := ac.SequenceNumber() if err != nil { logx.Errorw("get account sequence number error", logx.Field("err", err)) return } seqNo-- for _, tx := range txs { seqNo++ btx, err := a.buildTxs(tx, aptos.SequenceNumber(seqNo)) if err != nil { logx.Errorw("build transaction error", logx.Field("err", err), logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String())) a.svcCtx.AirdropModel.SetIllegal(a.ctx, tx.Id) continue } var txHash string if hash, err := btx.Hash(); err == nil { txHash = hash } if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, txHash, tx.Status == model.AirdropStatusFailed.String()) { logx.Infow("set sent error", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String())) continue } signedTransactions = append(signedTransactions, btx) txIds = append(txIds, tx.Id) logx.Debugw("build transaction success", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String())) } faileds, err := a.client.BatchSubmitTransaction(signedTransactions) if err != nil { logx.Errorw("submit transaction error", logx.Field("err", err)) return } for _, tf := range faileds.TransactionFailures { ix := signedTransactions[int(tf.TransactionIndex)] var txHash string if hash, err := ix.Hash(); err == nil { txHash = hash } id := txIds[int(tf.TransactionIndex)] logx.Errorw("submit transaction error", logx.Field("err", tf.Error), logx.Field("id", id), logx.Field("txHash", txHash)) a.svcCtx.AirdropModel.SetFailed(a.ctx, id, txHash) } } func (a *Airdrop) checkTx(t *model.NhAirdropLog) { tx, err := a.client.TransactionByHash(t.TxHash) if err != nil { logx.Errorw("get transaction by hash error", logx.Field("err", err), logx.Field("hash", t.TxHash), logx.Field("id", t.Id)) return } success := tx.Success() if success != nil && *success { logx.Infow("transaction success", logx.Field("hash", t.TxHash), logx.Field("id", t.Id), logx.Field("address", t.Address), logx.Field("amount", t.Amount.String())) a.svcCtx.AirdropModel.SetSuccess(a.ctx, t.Id) } else { logx.Infow("transaction not success", logx.Field("hash", t.TxHash), logx.Field("id", t.Id)) } } func (a *Airdrop) Start() { tkDuration := time.Second * 15 txTk := time.NewTicker(tkDuration) checkTk := time.NewTicker(time.Second * 13) for { select { case <-a.ctx.Done(): return case <-txTk.C: var airdropList []*model.NhAirdropLog // 新导入的交易 txs1, err := a.svcCtx.AirdropModel.FindAirdropNewImportList(a.ctx, 100) if err != nil { logx.Errorw("find airdrop list error", logx.Field("err", err)) } else if len(txs1) > 0 { airdropList = append(airdropList, txs1...) } // 失败需要重发的交易 txs2, err := a.svcCtx.AirdropModel.FindAirdropFailedList(a.ctx, 20) if err != nil { logx.Errorw("find airdrop filed list error", logx.Field("err", err)) } else if len(txs2) > 0 { airdropList = append(airdropList, txs2...) } a.processTxs(airdropList) txTk.Reset(tkDuration) case <-checkTk.C: txs, err := a.svcCtx.AirdropModel.FindRequireCheckList(a.ctx, 100) if err != nil { logx.Errorw("find airdrop list error", logx.Field("err", err)) continue } for _, tx := range txs { a.checkTx(tx) } } } } func (a *Airdrop) Stop() { a.cancel() //close(a.payloads) //close(a.results) }