批量空投

This commit is contained in:
2025-02-19 21:23:09 +08:00
parent e01dd4f18c
commit 7b2501c46e
14 changed files with 653 additions and 157 deletions

View File

@@ -1,10 +1,10 @@
FROM golang:1.23.4-alpine AS builder
FROM golang:1.24.0-alpine AS builder
LABEL stage=gobuilder
ENV CGO_ENABLED=0
ENV GOPROXY=https://goproxy.cn,direct
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
#RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
RUN apk update --no-cache && apk add --no-cache tzdata

View File

@@ -1,4 +1,4 @@
host ?= 192.168.2.109:3306
host ?= 192.168.2.108:3306
user ?= huangjie
pwd ?= jMDqPQM^a6hsAR
table ?=

View File

@@ -4,9 +4,9 @@ CREATE TABLE `nh_task_progress`
`uid` int(11) unsigned NOT NULL,
`task_id` int(11) unsigned NOT NULL COMMENT '任务id',
`task_seq` int(11) NOT NULL COMMENT '用于可重复任务的序列号',
`stage` tinyint NOT NULL DEFAULT 0 COMMENT '任务的阶段, 0:未完成 1:待校验 2:已完成未领取 3:已领取',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`stage` tinyint NOT NULL DEFAULT 0 COMMENT '任务的阶段, 0:未完成 1:待校验 2:已完成未领取 3:已领取',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uid_task_id_seq` (`uid`, `task_id`, `task_seq`)
) COMMENT ='用户任务节点';
@@ -14,12 +14,12 @@ CREATE TABLE `nh_task_progress`
CREATE TABLE `nh_nft_holder`
(
`id` int unsigned NOT NULL AUTO_INCREMENT,
`address` varchar(80) NOT NULL COMMENT '钱包地址',
`token_id` varchar(32) NOT NULL COMMENT 'token id',
`address` varchar(80) NOT NULL COMMENT '钱包地址',
`token_id` varchar(32) NOT NULL COMMENT 'token id',
`balance` int(11) NOT NULL DEFAULT 0 COMMENT '余额',
`update_seq` int NOT NULL COMMENT '更新序列号',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`update_seq` int NOT NULL COMMENT '更新序列号',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY (`token_id`),
INDEX (`update_seq`)
@@ -28,12 +28,12 @@ CREATE TABLE `nh_nft_holder`
CREATE TABLE `nh_nft_holder_change_log`
(
`id` int unsigned NOT NULL AUTO_INCREMENT,
`address` varchar(80) NOT NULL COMMENT '钱包地址',
`token_id` varchar(32) NOT NULL COMMENT 'token id',
`address` varchar(80) NOT NULL COMMENT '钱包地址',
`token_id` varchar(32) NOT NULL COMMENT 'token id',
`value` int(11) NOT NULL COMMENT '变化数量',
`balance` int(11) NOT NULL COMMENT '余额',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`)
) COMMENT ='nft 持有表变化日志';
@@ -42,11 +42,11 @@ CREATE TABLE `nh_task_nft_stake`
`id` int unsigned NOT NULL AUTO_INCREMENT,
`uid` int unsigned NOT NULL COMMENT '用户钱包',
`role_id` bigint unsigned NOT NULL COMMENT '角色id',
`token_id` varchar(32) NOT NULL COMMENT 'token id',
`token_id` varchar(32) NOT NULL COMMENT 'token id',
`type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '类型0=小塔罗1=大塔罗',
`state` tinyint NOT NULL DEFAULT 0 COMMENT '状态1质押中 0已取消质押',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`state` tinyint NOT NULL DEFAULT 0 COMMENT '状态1质押中 0已取消质押',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY (`token_id`)
) COMMENT ='nft质押表';
@@ -56,16 +56,16 @@ CREATE TABLE `nh_task_nft_stake_log`
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`uid` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '用户钱包',
`role_id` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '角色id',
`token_id` varchar(32) NOT NULL COMMENT 'token id',
`token_id` varchar(32) NOT NULL COMMENT 'token id',
`operate` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态1质押 2取消质押, 3转出',
`callback_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '下发通知状态:0未通知,1已通知,2通知异常',
`callback_num` int(10) NOT NULL DEFAULT '0' COMMENT '发送通知次数',
`callback_at` timestamp NULL DEFAULT NULL COMMENT '发送通知最新时间',
`callback_remark` varchar(255) NOT NULL DEFAULT '' COMMENT '通知回调备注',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`callback_at` timestamp NULL DEFAULT NULL COMMENT '发送通知最新时间',
`callback_remark` varchar(255) NOT NULL DEFAULT '' COMMENT '通知回调备注',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `callback_status` (`callback_status`) USING BTREE
KEY `callback_status` (`callback_status`) USING BTREE
) COMMENT ='nft质押日志表';
CREATE TABLE `nh_task_nft_stake_reward`
@@ -97,3 +97,13 @@ CREATE TABLE `nh_email_reward`
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`)
) COMMENT ='需要发放积分的';
CREATE TABLE `nh_nft_tarot`
(
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`token_id` varchar(32) NOT NULL COMMENT 'token id',
`tarot_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '类型0=小塔罗1=大塔罗',
`tarot_img` varchar(128) NOT NULL COMMENT '塔罗图片',
PRIMARY KEY (`id`),
UNIQUE KEY (`token_id`)
) COMMENT ='塔罗信息';

View File

@@ -19,7 +19,7 @@ Auth: # js-sdk鉴权相关配置
AccessSecret: "Mj2G%szYe&$MP@ytNv8JktQN1n5^cPq%" # 鉴权token密钥
MySql: # mysql相关配置
Addr: "192.168.20.101:3306" # mysql地址
Addr: "192.168.2.108:3306" # mysql地址
User: "huangjie" # mysql用户
Password: "jMDqPQM^a6hsAR" # mysql密码
Database: "nova_home" # 数据库名
@@ -41,6 +41,10 @@ Earn:
ClientSecret: "GJpQ4TmX4p2VMY7U3XtExZQKYfibMv24"
GameId: "c0deda99-bb15-47a2-a3be-f1fe2983cde2"
AptosConf:
PrivateKey: "0xd5ff131abc602f96192d3d23531cf1577394f4997f4bffeb249fe605be688ad0"
IsTest: true
EarnCorn:
Spec: "" #"@every 5m"
RunOnStart : false

2
go.mod
View File

@@ -1,6 +1,6 @@
module nova_task
go 1.23.4
go 1.24
require (
github.com/aptos-labs/aptos-go-sdk v1.4.1

View File

@@ -30,6 +30,10 @@ type Config struct {
SettleSpec string
HolderCheckRunOnStart bool `json:",optional"`
} `json:",optional"`
AptosConf struct {
PrivateKey string
IsTest bool `json:",default=false"`
} `json:",optional"`
}
type Cron struct {

View File

@@ -0,0 +1,309 @@
package airdrop
import (
"context"
"errors"
"github.com/aptos-labs/aptos-go-sdk"
"github.com/aptos-labs/aptos-go-sdk/bcs"
"github.com/aptos-labs/aptos-go-sdk/crypto"
"github.com/shopspring/decimal"
"github.com/zeromicro/go-zero/core/logx"
"nova_task/internal/model"
"nova_task/internal/svc"
"time"
)
type Airdrop struct {
ctx context.Context
cancel context.CancelFunc
svcCtx *svc.ServiceContext
client *aptos.Client
sender *aptos.Account
//payloads chan aptos.TransactionBuildPayload
//results chan aptos.TransactionSubmissionResponse
isTest bool
}
func NewAirdrop(svcCtx *svc.ServiceContext) (*Airdrop, error) {
if svcCtx.Config.AptosConf.PrivateKey == "" {
return nil, errors.New("aptos private key is empty")
}
var networkConfig aptos.NetworkConfig
if svcCtx.Config.AptosConf.IsTest {
networkConfig = aptos.TestnetConfig
} else {
networkConfig = aptos.MainnetConfig
}
client, err := aptos.NewClient(networkConfig)
if err != nil {
return nil, err
}
privateKey := &crypto.Ed25519PrivateKey{}
err = privateKey.FromHex(svcCtx.Config.AptosConf.PrivateKey)
if err != nil {
return nil, err
}
sender, err := aptos.NewAccountFromSigner(privateKey)
if err != nil {
return nil, err
}
//payloads := make(chan aptos.TransactionBuildPayload, 100)
//results := make(chan aptos.TransactionSubmissionResponse, 100)
//
//threading.GoSafe(func() {
// client.BuildSignAndSubmitTransactions(sender, payloads, results)
//})
ctx, cancel := context.WithCancel(context.Background())
a := &Airdrop{
ctx: ctx,
svcCtx: svcCtx,
cancel: cancel,
client: client,
sender: sender,
//payloads: payloads,
//results: results,
isTest: svcCtx.Config.AptosConf.IsTest,
}
//threading.GoSafe(func() {
// for result := range results {
// a.handleResult(result)
// }
//})
return a, nil
}
func (a *Airdrop) CoinTransferPayload(toAddress string, amount decimal.Decimal) (*aptos.EntryFunction, error) {
receiver := aptos.AccountAddress{}
err := receiver.ParseStringRelaxed(toAddress)
if err != nil {
return nil, err
}
amountUint64 := uint64(amount.Mul(decimal.NewFromInt(1000000)).IntPart())
if a.isTest {
contractAddress := aptos.AccountAddress{}
err = contractAddress.ParseStringRelaxed("0x43417434fd869edee76cca2a4d2301e528a1551b1d719b75c350c3c97d15b8b9")
if err != nil {
return nil, err
}
coinType := &aptos.TypeTag{
Value: &aptos.StructTag{
Address: contractAddress,
Module: "coins",
Name: "USDT",
TypeParams: []aptos.TypeTag{}, // USDT 没有额外的类型参数
},
}
return aptos.CoinTransferPayload(coinType, receiver, amountUint64)
}
amountBytes, err := bcs.SerializeU64(amountUint64)
if err != nil {
return nil, err
}
coinType := aptos.TypeTag{
Value: &aptos.StructTag{
Address: aptos.AccountOne,
Module: "fungible_asset",
Name: "Metadata",
TypeParams: []aptos.TypeTag{}, // USDT 没有额外的类型参数
},
}
return &aptos.EntryFunction{
Module: aptos.ModuleId{
Address: aptos.AccountOne,
Name: "primary_fungible_store",
},
Function: "transfer",
ArgTypes: []aptos.TypeTag{coinType},
Args: [][]byte{
receiver[:],
amountBytes,
},
}, nil
}
//func (a *Airdrop) transferUsdt(id uint64, toAddress string, amount decimal.Decimal) error {
// p, err := a.CoinTransferPayload(toAddress, amount)
// if err != nil {
// return err
// }
//
// a.payloads <- aptos.TransactionBuildPayload{
// Id: id,
// Type: aptos.TransactionSubmissionTypeSingle,
// Inner: aptos.TransactionPayload{Payload: p},
// }
// result := <-a.results
// result.Id = id
// a.handleResult(result)
// return nil
//}
//func (a *Airdrop) handleResult(result aptos.TransactionSubmissionResponse) {
// if result.Err != nil {
// logx.Errorw("submit transaction error", logx.Field("err", result.Err), logx.Field("id", result.Id))
// var txHash string
// if result.Response != nil {
// txHash = result.Response.Hash
// }
// a.svcCtx.AirdropModel.SetFailed(a.ctx, uint(result.Id), txHash)
// return
// }
// if result.Response != nil {
// logx.Infow("submit transaction success", logx.Field("id", result.Id), logx.Field("txHash", result.Response.Hash))
// a.svcCtx.AirdropModel.SetTxHash(a.ctx, uint(result.Id), result.Response.Hash)
// }
//}
//func (a *Airdrop) processTx(tx *model.NhAirdropLog, isRetry bool) {
// if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, "", isRetry) {
// return
// }
// err := a.transferUsdt(uint64(tx.Id), tx.Address, tx.Amount)
// if err != nil {
// logx.Errorw("transfer usdt error", logx.Field("err", err), logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
// a.svcCtx.AirdropModel.SetIllegal(a.ctx, tx.Id)
// } else {
// logx.Infow("submit transaction success", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
// }
//}
func (a *Airdrop) buildTxs(tx *model.NhAirdropLog, seqNum aptos.SequenceNumber) (*aptos.SignedTransaction, error) {
p, err := a.CoinTransferPayload(tx.Address, tx.Amount)
if err != nil {
return nil, err
}
rx, err := a.client.BuildTransaction(a.sender.AccountAddress(), aptos.TransactionPayload{Payload: p}, seqNum)
if err != nil {
return nil, err
}
return rx.SignedTransaction(a.sender)
}
func (a *Airdrop) processTxs(txs []*model.NhAirdropLog, isRetry bool) {
var signedTransactions []*aptos.SignedTransaction
var txIds []uint
logx.Debugw("build transactions", logx.Field("count", len(txs)), logx.Field("isRetry", isRetry))
ac, err := a.client.Account(a.sender.AccountAddress())
if err != nil {
logx.Errorw("get account info error", logx.Field("err", err))
return
}
seqNo, err := ac.SequenceNumber()
if err != nil {
logx.Errorw("get account sequence number error", logx.Field("err", err))
return
}
seqNo--
for _, tx := range txs {
seqNo++
btx, err := a.buildTxs(tx, aptos.SequenceNumber(seqNo))
if err != nil {
logx.Errorw("build transaction error", logx.Field("err", err), logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
a.svcCtx.AirdropModel.SetIllegal(a.ctx, tx.Id)
continue
}
var txHash string
if hash, err := btx.Hash(); err == nil {
txHash = hash
}
if !a.svcCtx.AirdropModel.SetSent(a.ctx, tx.Id, txHash, isRetry) {
logx.Infow("set sent error", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
continue
}
signedTransactions = append(signedTransactions, btx)
txIds = append(txIds, tx.Id)
logx.Debugw("build transaction success", logx.Field("id", tx.Id), logx.Field("address", tx.Address), logx.Field("amount", tx.Amount.String()))
}
faileds, err := a.client.BatchSubmitTransaction(signedTransactions)
if err != nil {
logx.Errorw("submit transaction error", logx.Field("err", err))
return
}
for _, tf := range faileds.TransactionFailures {
ix := signedTransactions[int(tf.TransactionIndex)]
var txHash string
if hash, err := ix.Hash(); err == nil {
txHash = hash
}
id := txIds[int(tf.TransactionIndex)]
logx.Errorw("submit transaction error", logx.Field("err", tf.Error), logx.Field("id", id), logx.Field("txHash", txHash))
a.svcCtx.AirdropModel.SetFailed(a.ctx, id, txHash)
}
}
func (a *Airdrop) checkTx(t *model.NhAirdropLog) {
tx, err := a.client.TransactionByHash(t.TxHash)
if err != nil {
logx.Errorw("get transaction by hash error", logx.Field("err", err), logx.Field("hash", t.TxHash), logx.Field("id", t.Id))
return
}
success := tx.Success()
if success != nil && *success {
logx.Infow("transaction success", logx.Field("hash", t.TxHash), logx.Field("id", t.Id), logx.Field("address", t.Address), logx.Field("amount", t.Amount.String()))
a.svcCtx.AirdropModel.SetSuccess(a.ctx, t.Id)
} else {
logx.Infow("transaction not success", logx.Field("hash", t.TxHash), logx.Field("id", t.Id))
}
}
func (a *Airdrop) Start() {
newImportTk := time.NewTicker(time.Second * 10)
checkTk := time.NewTicker(time.Second * 13)
retryTk := time.NewTicker(time.Second * 17)
for {
select {
case <-a.ctx.Done():
return
case <-newImportTk.C:
txs, err := a.svcCtx.AirdropModel.FindAirdropNewImportList(a.ctx, 100)
if err != nil {
logx.Errorw("find airdrop list error", logx.Field("err", err))
continue
}
if len(txs) == 0 {
logx.Debugw("find airdrop list empty", logx.Field("count", len(txs)))
continue
}
a.processTxs(txs, false)
case <-checkTk.C:
txs, err := a.svcCtx.AirdropModel.FindRequireCheckList(a.ctx, 100)
if err != nil {
logx.Errorw("find airdrop list error", logx.Field("err", err))
continue
}
for _, tx := range txs {
a.checkTx(tx)
}
case <-retryTk.C:
txs, err := a.svcCtx.AirdropModel.FindAirdropFailedList(a.ctx, 100)
if err != nil {
logx.Errorw("find airdrop filed list error", logx.Field("err", err))
continue
}
if len(txs) == 0 {
logx.Debugw("find airdrop filed list empty", logx.Field("count", len(txs)))
continue
}
a.processTxs(txs, true)
}
}
}
func (a *Airdrop) Stop() {
a.cancel()
//close(a.payloads)
//close(a.results)
}

View File

@@ -1,116 +0,0 @@
package apt
import (
"github.com/aptos-labs/aptos-go-sdk"
"github.com/aptos-labs/aptos-go-sdk/crypto"
"github.com/zeromicro/go-zero/core/threading"
"nova_task/internal/svc"
)
type Apt struct {
svcCtx svc.ServiceContext
client *aptos.Client
sender *aptos.Account
payloads chan aptos.TransactionBuildPayload
results chan aptos.TransactionSubmissionResponse
isTest bool
}
func NewApt(svcCtx svc.ServiceContext, privateKeyStr string, isTest bool) (*Apt, error) {
var networkConfig aptos.NetworkConfig
if isTest {
networkConfig = aptos.TestnetConfig
} else {
networkConfig = aptos.MainnetConfig
}
client, err := aptos.NewClient(networkConfig)
if err != nil {
return nil, err
}
privateKey := &crypto.Ed25519PrivateKey{}
err = privateKey.FromHex(privateKeyStr)
if err != nil {
return nil, err
}
sender, err := aptos.NewAccountFromSigner(privateKey)
if err != nil {
return nil, err
}
payloads := make(chan aptos.TransactionBuildPayload, 100)
results := make(chan aptos.TransactionSubmissionResponse, 100)
threading.GoSafe(func() {
client.BuildSignAndSubmitTransactions(sender, payloads, results)
})
a := &Apt{
client: client,
sender: sender,
payloads: payloads,
results: results,
isTest: isTest,
}
threading.GoSafe(func() {
for result := range results {
a.handleResult(result)
}
})
return a, nil
}
func (a *Apt) transferUsdt(id uint64, toAddress string, amount uint64) error {
receiver := aptos.AccountAddress{}
err := receiver.ParseStringRelaxed(toAddress)
if err != nil {
return err
}
// 0x1::primary_fungible_store::transfer
// fungible_asset::Metadata
contractAddress := aptos.AccountAddress{}
err = receiver.ParseStringRelaxed("0x43417434fd869edee76cca2a4d2301e528a1551b1d719b75c350c3c97d15b8b9")
if err != nil {
return err
}
coinType := &aptos.TypeTag{
Value: &aptos.StructTag{
Address: contractAddress,
Module: "coin",
Name: "USDT",
TypeParams: []aptos.TypeTag{}, // USDT 没有额外的类型参数
},
}
p, err := aptos.CoinTransferPayload(coinType, receiver, amount)
if err != nil {
return err
}
a.payloads <- aptos.TransactionBuildPayload{
Id: id,
Type: aptos.TransactionSubmissionTypeSingle,
Inner: aptos.TransactionPayload{Payload: p},
}
return nil
}
func (a *Apt) handleResult(result aptos.TransactionSubmissionResponse) {
}
func (a *Apt) Start() {
a.svcCtx.
}
func (a *Apt) Stop() {
close(a.payloads)
close(a.results)
}

View File

@@ -18,7 +18,7 @@ var cronList = []func(context.Context, *svc.ServiceContext) cron.Job{
stake_settle.NewCron,
}
type Job struct {
type Corns struct {
ctx context.Context
cancel context.CancelFunc
svcCtx *svc.ServiceContext
@@ -29,8 +29,8 @@ type Spec interface {
Spec() string
}
// NewJob 创建定时器服务
func NewJob(svcCtx *svc.ServiceContext) *Job {
// NewCorns 创建定时器服务
func newCorns(svcCtx *svc.ServiceContext) *Corns {
ctx, cancel := context.WithCancel(context.Background())
var err error
c := cron.New(cron.WithSeconds())
@@ -39,7 +39,7 @@ func NewJob(svcCtx *svc.ServiceContext) *Job {
if cs, ok := cr.(Spec); ok {
spec := cs.Spec()
if spec == "" {
logx.Errorw("cron job spec is empty")
logx.Errorw("cron Corns spec is empty")
continue
}
_, err = c.AddJob(spec, cr)
@@ -51,9 +51,9 @@ func NewJob(svcCtx *svc.ServiceContext) *Job {
c.Schedule(cs, cr)
continue
}
logx.Must(errors.New("cron job must implement either Spec or Schedule interface"))
logx.Must(errors.New("cron Corns must implement either Spec or Schedule interface"))
}
return &Job{
return &Corns{
ctx: ctx,
cancel: cancel,
svcCtx: svcCtx,
@@ -61,14 +61,14 @@ func NewJob(svcCtx *svc.ServiceContext) *Job {
}
}
func (j *Job) Start() {
logx.Info("start cron job")
func (j *Corns) Start() {
logx.Info("start cron Corns")
j.c.Start()
}
func (j *Job) Stop() {
logx.Info("stop cron job")
func (j *Corns) Stop() {
logx.Info("stop cron Corns")
<-j.c.Stop().Done()
logx.Info("cron job stopped")
logx.Info("cron Corns stopped")
j.cancel()
}

21
internal/job/jobs.go Normal file
View File

@@ -0,0 +1,21 @@
package job
import (
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/service"
"nova_task/internal/job/airdrop"
"nova_task/internal/svc"
)
func BuildJobs(svcCtx *svc.ServiceContext) []service.Service {
ss := []service.Service{
newCorns(svcCtx),
}
airdropService, err := airdrop.NewAirdrop(svcCtx)
if err != nil {
logx.Errorw("airdrop service init error", logx.Field("err", err))
} else {
ss = append(ss, airdropService)
}
return ss
}

View File

@@ -0,0 +1,162 @@
package model
import (
"context"
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"time"
)
var _ NhAirdropLogModel = (*customNhAirdropLogModel)(nil)
const (
AirdropStatusNewImport StatusType = "新导入"
AirdropStatusSent StatusType = "已发送交易"
AirdropStatusSuccess StatusType = "交易成功"
AirdropStatusFailed StatusType = "交易失败"
AirdropStatusReSent StatusType = "已重新发送交易"
AirdropStatusIllegal StatusType = "非法交易"
)
type (
// NhAirdropLogModel is an interface to be customized, add more methods here,
// and implement the added methods in customNhAirdropLogModel.
NhAirdropLogModel interface {
nhAirdropLogModel
withSession(session sqlx.Session) NhAirdropLogModel
FindAirdropNewImportList(ctx context.Context, count int) ([]*NhAirdropLog, error)
FindAirdropFailedList(ctx context.Context, count int) ([]*NhAirdropLog, error)
FindRequireCheckList(ctx context.Context, count int) ([]*NhAirdropLog, error)
SetSent(ctx context.Context, id uint, txHash string, isRetry bool) bool
SetIllegal(ctx context.Context, id uint) bool
SetSuccess(ctx context.Context, id uint) bool
SetFailed(ctx context.Context, id uint, txHash string) bool
SetTxHash(ctx context.Context, id uint, txHash string)
}
customNhAirdropLogModel struct {
*defaultNhAirdropLogModel
}
StatusType string
)
// NewNhAirdropLogModel returns a model for the database table.
func NewNhAirdropLogModel(conn sqlx.SqlConn) NhAirdropLogModel {
return &customNhAirdropLogModel{
defaultNhAirdropLogModel: newNhAirdropLogModel(conn),
}
}
func (m *customNhAirdropLogModel) SetTxHash(ctx context.Context, id uint, txHash string) {
update := fmt.Sprintf("update %s set `tx_hash` = ? where `id` = ?", m.table)
_, err := m.conn.ExecCtx(ctx, update, txHash, id)
if err != nil {
logx.Errorw("set txHash error", logx.Field("err", err), logx.Field("id", id))
}
}
func (m *customNhAirdropLogModel) SetFailed(ctx context.Context, id uint, txHash string) bool {
update := fmt.Sprintf("update %s set `status` = '%s', `tx_hash` = ? where `id` = ? and (`status` = '%s' or `status` = '%s')", m.table, AirdropStatusFailed, AirdropStatusSent, AirdropStatusReSent)
result, err := m.conn.ExecCtx(ctx, update, txHash, id)
if err != nil {
logx.Errorw("set failed error", logx.Field("err", err), logx.Field("id", id))
return false
}
rows, err := result.RowsAffected()
if err != nil {
logx.Errorw("set failed RowsAffected error", logx.Field("err", err), logx.Field("id", id))
return false
}
return rows > 0
}
func (m *customNhAirdropLogModel) SetSuccess(ctx context.Context, id uint) bool {
update := fmt.Sprintf("update %s set `status` = '%s' where `id` = ? and (`status` = '%s' or `status` = '%s')", m.table, AirdropStatusSuccess, AirdropStatusSent, AirdropStatusReSent)
result, err := m.conn.ExecCtx(ctx, update, id)
if err != nil {
logx.Errorw("set success error", logx.Field("err", err), logx.Field("id", id))
return false
}
rows, err := result.RowsAffected()
if err != nil {
logx.Errorw("set success RowsAffected error", logx.Field("err", err), logx.Field("id", id))
return false
}
return rows > 0
}
func (t StatusType) String() string {
return string(t)
}
func (m *customNhAirdropLogModel) SetIllegal(ctx context.Context, id uint) bool {
update := fmt.Sprintf("update %s set `status` = '%s' where `id` = ? and (`status` = '%s' or `status` = '%s')", m.table, AirdropStatusIllegal, AirdropStatusSent, AirdropStatusReSent)
result, err := m.conn.ExecCtx(ctx, update, id)
if err != nil {
logx.Errorw("set illegal error", logx.Field("err", err), logx.Field("id", id))
return false
}
rows, err := result.RowsAffected()
if err != nil {
logx.Errorw("set illegal RowsAffected error", logx.Field("err", err), logx.Field("id", id))
return false
}
return rows > 0
}
func (m *customNhAirdropLogModel) SetSent(ctx context.Context, id uint, txHash string, isRetry bool) bool {
var update string
if isRetry {
update = fmt.Sprintf("update %s set `status` = '%s', `tx_hash` = ?, `submit_at` = ? where `id` = ? and `status` = '%s'", m.table, AirdropStatusReSent, AirdropStatusFailed)
} else {
update = fmt.Sprintf("update %s set `status` = '%s', `tx_hash` = ?, `submit_at` = ? where `id` = ? and `status` = '%s'", m.table, AirdropStatusSent, AirdropStatusNewImport)
}
result, err := m.conn.ExecCtx(ctx, update, txHash, time.Now(), id)
if err != nil {
logx.Errorw("set sent error", logx.Field("err", err), logx.Field("id", id))
return false
}
rows, err := result.RowsAffected()
if err != nil {
logx.Errorw("set sent RowsAffected error", logx.Field("err", err), logx.Field("id", id))
return false
}
return rows > 0
}
func (m *customNhAirdropLogModel) FindAirdropNewImportList(ctx context.Context, count int) ([]*NhAirdropLog, error) {
query := fmt.Sprintf("select %s from %s where `status` = '%s' and `is_del` = 0 limit ?", nhAirdropLogRows, m.table, AirdropStatusNewImport)
var tasks []*NhAirdropLog
err := m.conn.QueryRowsCtx(ctx, &tasks, query, count)
if err != nil && !errors.Is(err, sqlx.ErrNotFound) {
return nil, err
}
return tasks, nil
}
func (m *customNhAirdropLogModel) FindAirdropFailedList(ctx context.Context, count int) ([]*NhAirdropLog, error) {
query := fmt.Sprintf("select %s from %s where `status` = '%s' and `is_del` = 0 limit ?", nhAirdropLogRows, m.table, AirdropStatusFailed)
var tasks []*NhAirdropLog
err := m.conn.QueryRowsCtx(ctx, &tasks, query, count)
if err != nil && !errors.Is(err, sqlx.ErrNotFound) {
return nil, err
}
return tasks, nil
}
func (m *customNhAirdropLogModel) FindRequireCheckList(ctx context.Context, count int) ([]*NhAirdropLog, error) {
query := fmt.Sprintf("select %s from %s where (`status` = '%s' or `status` = '%s') and `is_del` = 0 limit ?", nhAirdropLogRows, m.table, AirdropStatusSent, AirdropStatusReSent)
var tasks []*NhAirdropLog
err := m.conn.QueryRowsCtx(ctx, &tasks, query, count)
if err != nil && !errors.Is(err, sqlx.ErrNotFound) {
return nil, err
}
return tasks, nil
}
func (m *customNhAirdropLogModel) withSession(session sqlx.Session) NhAirdropLogModel {
return NewNhAirdropLogModel(sqlx.NewSqlConnFromSession(session))
}

View File

@@ -0,0 +1,98 @@
// Code generated by goctl. DO NOT EDIT.
// versions:
// goctl version: 1.7.6
package model
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/zeromicro/go-zero/core/stores/builder"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/shopspring/decimal"
)
var (
nhAirdropLogFieldNames = builder.RawFieldNames(&NhAirdropLog{})
nhAirdropLogRows = strings.Join(nhAirdropLogFieldNames, ",")
nhAirdropLogRowsExpectAutoSet = strings.Join(stringx.Remove(nhAirdropLogFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",")
nhAirdropLogRowsWithPlaceHolder = strings.Join(stringx.Remove(nhAirdropLogFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?"
)
type (
nhAirdropLogModel interface {
Insert(ctx context.Context, data *NhAirdropLog) (sql.Result, error)
FindOne(ctx context.Context, id uint) (*NhAirdropLog, error)
Update(ctx context.Context, data *NhAirdropLog) error
Delete(ctx context.Context, id uint) error
}
defaultNhAirdropLogModel struct {
conn sqlx.SqlConn
table string
}
NhAirdropLog struct {
Id uint `db:"id"`
Address string `db:"address"` // 用户钱包地址
Amount decimal.Decimal `db:"amount"` // 空投数量
Status string `db:"status"` // 状态
TxHash string `db:"tx_hash"` // 交易hash
SubmitAt sql.NullTime `db:"submit_at"` // 提交时间
BatchDate sql.NullTime `db:"batch_date"` // 批次-日期
BatchNum uint `db:"batch_num"` // 批次-数量
AdminId uint `db:"admin_id"` // 管理员ID
IsDel int8 `db:"is_del"` // 是否已删除:0否,1是
CreatedAt time.Time `db:"created_at"` // 创建时间
UpdatedAt time.Time `db:"updated_at"` // 修改时间
}
)
func newNhAirdropLogModel(conn sqlx.SqlConn) *defaultNhAirdropLogModel {
return &defaultNhAirdropLogModel{
conn: conn,
table: "`nh_airdrop_log`",
}
}
func (m *defaultNhAirdropLogModel) Delete(ctx context.Context, id uint) error {
query := fmt.Sprintf("delete from %s where `id` = ?", m.table)
_, err := m.conn.ExecCtx(ctx, query, id)
return err
}
func (m *defaultNhAirdropLogModel) FindOne(ctx context.Context, id uint) (*NhAirdropLog, error) {
query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", nhAirdropLogRows, m.table)
var resp NhAirdropLog
err := m.conn.QueryRowCtx(ctx, &resp, query, id)
switch err {
case nil:
return &resp, nil
case sqlx.ErrNotFound:
return nil, ErrNotFound
default:
return nil, err
}
}
func (m *defaultNhAirdropLogModel) Insert(ctx context.Context, data *NhAirdropLog) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, nhAirdropLogRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.Address, data.Amount, data.Status, data.TxHash, data.SubmitAt, data.BatchDate, data.BatchNum, data.AdminId, data.IsDel)
return ret, err
}
func (m *defaultNhAirdropLogModel) Update(ctx context.Context, data *NhAirdropLog) error {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, nhAirdropLogRowsWithPlaceHolder)
_, err := m.conn.ExecCtx(ctx, query, data.Address, data.Amount, data.Status, data.TxHash, data.SubmitAt, data.BatchDate, data.BatchNum, data.AdminId, data.IsDel, data.Id)
return err
}
func (m *defaultNhAirdropLogModel) tableName() string {
return m.table
}

View File

@@ -46,6 +46,7 @@ type ServiceContext struct {
GameReportModel model.NhGameReportModel
RoleModel model.NhRoleModel
GamesPropertyModel model.NhGamesPropertyLogsModel
AirdropModel model.NhAirdropLogModel
ApiKeyCheck rest.Middleware
AdminSecretCheck rest.Middleware
@@ -86,6 +87,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
GameReportModel: model.NewNhGameReportModel(dbConn),
RoleModel: model.NewNhRoleModel(dbConn),
GamesPropertyModel: model.NewNhGamesPropertyLogsModel(dbConn),
AirdropModel: model.NewNhAirdropLogModel(dbConn),
ApiKeyCheck: middleware.NewApiKeyCheckMiddleware(configModel).Handle,
AdminSecretCheck: middleware.NewAdminSecretCheckMiddleware(configModel).Handle,

View File

@@ -29,8 +29,10 @@ func main() {
serviceGroup := service.NewServiceGroup()
defer serviceGroup.Stop()
jb := job.NewJob(ctx)
serviceGroup.Add(jb)
jbs := job.BuildJobs(ctx)
for _, jb := range jbs {
serviceGroup.Add(jb)
}
httpSvr := rest.MustNewServer(c.RestConf, rest.WithCors(), errs.WithUnauthorizedCallback())
handler.RegisterHandlers(httpSvr, ctx)