311 lines
8.9 KiB
Go
311 lines
8.9 KiB
Go
package airdrop
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"github.com/aptos-labs/aptos-go-sdk"
|
|
"github.com/aptos-labs/aptos-go-sdk/bcs"
|
|
"github.com/aptos-labs/aptos-go-sdk/crypto"
|
|
"github.com/shopspring/decimal"
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
"nova_task/internal/model"
|
|
"nova_task/internal/svc"
|
|
"time"
|
|
)
|
|
|
|
type Airdrop struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
svcCtx *svc.ServiceContext
|
|
client *aptos.Client
|
|
sender *aptos.Account
|
|
|
|
//payloads chan aptos.TransactionBuildPayload
|
|
//results chan aptos.TransactionSubmissionResponse
|
|
|
|
isTest bool
|
|
}
|
|
|
|
func NewAirdrop(svcCtx *svc.ServiceContext) (*Airdrop, error) {
|
|
if svcCtx.Config.AptosConf.PrivateKey == "" {
|
|
return nil, errors.New("aptos private key is empty")
|
|
}
|
|
var networkConfig aptos.NetworkConfig
|
|
if svcCtx.Config.AptosConf.IsTest {
|
|
networkConfig = aptos.TestnetConfig
|
|
} else {
|
|
networkConfig = aptos.MainnetConfig
|
|
}
|
|
client, err := aptos.NewClient(networkConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
privateKey := &crypto.Ed25519PrivateKey{}
|
|
err = privateKey.FromHex(svcCtx.Config.AptosConf.PrivateKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sender, err := aptos.NewAccountFromSigner(privateKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
//payloads := make(chan aptos.TransactionBuildPayload, 100)
|
|
//results := make(chan aptos.TransactionSubmissionResponse, 100)
|
|
//
|
|
//threading.GoSafe(func() {
|
|
// client.BuildSignAndSubmitTransactions(sender, payloads, results)
|
|
//})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
a := &Airdrop{
|
|
ctx: ctx,
|
|
svcCtx: svcCtx,
|
|
cancel: cancel,
|
|
client: client,
|
|
sender: sender,
|
|
//payloads: payloads,
|
|
//results: results,
|
|
isTest: svcCtx.Config.AptosConf.IsTest,
|
|
}
|
|
|
|
//threading.GoSafe(func() {
|
|
// for result := range results {
|
|
// a.handleResult(result)
|
|
// }
|
|
//})
|
|
|
|
return a, nil
|
|
}
|
|
|
|
func (a *Airdrop) CoinTransferPayload(toAddress string, amount decimal.Decimal) (*aptos.EntryFunction, error) {
|
|
receiver := aptos.AccountAddress{}
|
|
err := receiver.ParseStringRelaxed(toAddress)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
amountUint64 := uint64(amount.Mul(decimal.NewFromInt(1000000)).IntPart())
|
|
if a.isTest {
|
|
contractAddress := aptos.AccountAddress{}
|
|
err = contractAddress.ParseStringRelaxed("0x43417434fd869edee76cca2a4d2301e528a1551b1d719b75c350c3c97d15b8b9")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
coinType := &aptos.TypeTag{
|
|
Value: &aptos.StructTag{
|
|
Address: contractAddress,
|
|
Module: "coins",
|
|
Name: "USDT",
|
|
TypeParams: []aptos.TypeTag{}, // USDT 没有额外的类型参数
|
|
},
|
|
}
|
|
return aptos.CoinTransferPayload(coinType, receiver, amountUint64)
|
|
}
|
|
|
|
amountBytes, err := bcs.SerializeU64(amountUint64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
coinType := aptos.TypeTag{
|
|
Value: &aptos.StructTag{
|
|
Address: aptos.AccountOne,
|
|
Module: "fungible_asset",
|
|
Name: "Metadata",
|
|
TypeParams: []aptos.TypeTag{}, // USDT 没有额外的类型参数
|
|
},
|
|
}
|
|
return &aptos.EntryFunction{
|
|
Module: aptos.ModuleId{
|
|
Address: aptos.AccountOne,
|
|
Name: "primary_fungible_store",
|
|
},
|
|
Function: "transfer",
|
|
ArgTypes: []aptos.TypeTag{coinType},
|
|
Args: [][]byte{
|
|
receiver[:],
|
|
amountBytes,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
//func (a *Airdrop) transferUsdt(id uint64, toAddress string, amount decimal.Decimal) error {
|
|
// p, err := a.CoinTransferPayload(toAddress, amount)
|
|
// if err != nil {
|
|
// return err
|
|
// }
|
|
//
|
|
// a.payloads <- aptos.TransactionBuildPayload{
|
|
// Id: id,
|
|
// Type: aptos.TransactionSubmissionTypeSingle,
|
|
// Inner: aptos.TransactionPayload{Payload: p},
|
|
// }
|
|
// result := <-a.results
|
|
// result.Id = id
|
|
// a.handleResult(result)
|
|
// return nil
|
|
//}
|
|
|
|
//func (a *Airdrop) handleResult(result aptos.TransactionSubmissionResponse) {
|
|
// if result.Err != nil {
|
|
// logx.Errorw("submit transaction error", logx.Field("err", result.Err), logx.Field("id", result.Id))
|
|
// var txHash string
|
|
// if result.Response != nil {
|
|
// txHash = result.Response.Hash
|
|
// }
|
|
// a.svcCtx.AirdropModel.SetFailed(a.ctx, uint(result.Id), txHash)
|
|
// return
|
|
// }
|
|
// if result.Response != nil {
|
|
// logx.Infow("submit transaction success", logx.Field("id", result.Id), logx.Field("txHash", result.Response.Hash))
|
|
// a.svcCtx.AirdropModel.SetTxHash(a.ctx, uint(result.Id), result.Response.Hash)
|
|
// }
|
|
//}
|
|
|
|
//func (a *Airdrop) processTx(tx *model.NhAirdropLog, isRetry bool) {
|
|
// if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, "", isRetry) {
|
|
// return
|
|
// }
|
|
// err := a.transferUsdt(uint64(tx.Id), tx.Address, tx.Amount)
|
|
// if err != nil {
|
|
// logx.Errorw("transfer usdt error", logx.Field("err", err), logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
|
|
// a.svcCtx.AirdropModel.SetIllegal(a.ctx, tx.Id)
|
|
// } else {
|
|
// logx.Infow("submit transaction success", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
|
|
// }
|
|
//}
|
|
|
|
func (a *Airdrop) buildTxs(tx *model.NhAirdropLog, seqNum aptos.SequenceNumber) (*aptos.SignedTransaction, error) {
|
|
p, err := a.CoinTransferPayload(tx.Address, tx.Amount)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rx, err := a.client.BuildTransaction(a.sender.AccountAddress(), aptos.TransactionPayload{Payload: p}, seqNum)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return rx.SignedTransaction(a.sender)
|
|
}
|
|
|
|
func (a *Airdrop) processTxs(txs []*model.NhAirdropLog) {
|
|
if len(txs) == 0 {
|
|
return
|
|
}
|
|
var signedTransactions []*aptos.SignedTransaction
|
|
var txIds []uint
|
|
ac, err := a.client.Account(a.sender.AccountAddress())
|
|
if err != nil {
|
|
logx.Errorw("get account info error", logx.Field("err", err))
|
|
return
|
|
}
|
|
seqNo, err := ac.SequenceNumber()
|
|
if err != nil {
|
|
logx.Errorw("get account sequence number error", logx.Field("err", err))
|
|
return
|
|
}
|
|
seqNo--
|
|
for _, tx := range txs {
|
|
seqNo++
|
|
btx, err := a.buildTxs(tx, aptos.SequenceNumber(seqNo))
|
|
if err != nil {
|
|
logx.Errorw("build transaction error", logx.Field("err", err), logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
|
|
a.svcCtx.AirdropModel.SetIllegal(a.ctx, tx.Id)
|
|
continue
|
|
}
|
|
var txHash string
|
|
if hash, err := btx.Hash(); err == nil {
|
|
txHash = hash
|
|
}
|
|
if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, txHash, tx.Status == model.AirdropStatusFailed.String()) {
|
|
logx.Infow("set sent error", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
|
|
continue
|
|
}
|
|
signedTransactions = append(signedTransactions, btx)
|
|
txIds = append(txIds, tx.Id)
|
|
logx.Debugw("build transaction success", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
|
|
}
|
|
faileds, err := a.client.BatchSubmitTransaction(signedTransactions)
|
|
if err != nil {
|
|
logx.Errorw("submit transaction error", logx.Field("err", err))
|
|
return
|
|
}
|
|
|
|
for _, tf := range faileds.TransactionFailures {
|
|
ix := signedTransactions[int(tf.TransactionIndex)]
|
|
var txHash string
|
|
if hash, err := ix.Hash(); err == nil {
|
|
txHash = hash
|
|
}
|
|
|
|
id := txIds[int(tf.TransactionIndex)]
|
|
|
|
logx.Errorw("submit transaction error", logx.Field("err", tf.Error), logx.Field("id", id), logx.Field("txHash", txHash))
|
|
a.svcCtx.AirdropModel.SetFailed(a.ctx, id, txHash)
|
|
}
|
|
}
|
|
|
|
func (a *Airdrop) checkTx(t *model.NhAirdropLog) {
|
|
tx, err := a.client.TransactionByHash(t.TxHash)
|
|
if err != nil {
|
|
logx.Errorw("get transaction by hash error", logx.Field("err", err), logx.Field("hash", t.TxHash), logx.Field("id", t.Id))
|
|
return
|
|
}
|
|
success := tx.Success()
|
|
if success != nil && *success {
|
|
logx.Infow("transaction success", logx.Field("hash", t.TxHash), logx.Field("id", t.Id), logx.Field("address", t.Address), logx.Field("amount", t.Amount.String()))
|
|
a.svcCtx.AirdropModel.SetSuccess(a.ctx, t.Id)
|
|
} else {
|
|
logx.Infow("transaction not success", logx.Field("hash", t.TxHash), logx.Field("id", t.Id))
|
|
}
|
|
}
|
|
|
|
func (a *Airdrop) Start() {
|
|
tkDuration := time.Second * 15
|
|
txTk := time.NewTicker(tkDuration)
|
|
checkTk := time.NewTicker(time.Second * 13)
|
|
for {
|
|
select {
|
|
case <-a.ctx.Done():
|
|
return
|
|
case <-txTk.C:
|
|
var airdropList []*model.NhAirdropLog
|
|
|
|
// 新导入的交易
|
|
txs1, err := a.svcCtx.AirdropModel.FindAirdropNewImportList(a.ctx, 100)
|
|
if err != nil {
|
|
logx.Errorw("find airdrop list error", logx.Field("err", err))
|
|
} else if len(txs1) > 0 {
|
|
airdropList = append(airdropList, txs1...)
|
|
}
|
|
|
|
// 失败需要重发的交易
|
|
txs2, err := a.svcCtx.AirdropModel.FindAirdropFailedList(a.ctx, 20)
|
|
if err != nil {
|
|
logx.Errorw("find airdrop filed list error", logx.Field("err", err))
|
|
} else if len(txs2) > 0 {
|
|
airdropList = append(airdropList, txs2...)
|
|
}
|
|
a.processTxs(airdropList)
|
|
txTk.Reset(tkDuration)
|
|
|
|
case <-checkTk.C:
|
|
txs, err := a.svcCtx.AirdropModel.FindRequireCheckList(a.ctx, 100)
|
|
if err != nil {
|
|
logx.Errorw("find airdrop list error", logx.Field("err", err))
|
|
continue
|
|
}
|
|
for _, tx := range txs {
|
|
a.checkTx(tx)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *Airdrop) Stop() {
|
|
a.cancel()
|
|
//close(a.payloads)
|
|
//close(a.results)
|
|
}
|