Merge branch 'dev-lwl/withraw_sync_alipay' into test

# Conflicts:
#	app/mix/dao/mongo.go
#	app/mix/dao/mongo_vas.go
#	dbstruct/vas_mongo.go
This commit is contained in:
lwl0608 2024-12-09 16:20:13 +08:00
commit 87e344c636
11 changed files with 155 additions and 33 deletions

View File

@ -1,6 +1,7 @@
package main
import (
"fmt"
"log"
"net/url"
"os"
@ -13,16 +14,17 @@ func main() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
u := url.URL{Scheme: "ws", Host: "192.168.0.105:7890", Path: "/ws"} // 使用公共的 echo WebSocket 服务器进行测试
u := url.URL{Scheme: "wss", Host: "wsdebug.tiefen.fun", Path: "/ws"} // 使用公共的 echo WebSocket 服务器进行测试
//u := url.URL{Scheme: "ws", Host: "你的WebSocket服务器地址", Path: "/"} // 替换成你的服务器地址
log.Printf("connecting to %s", u.String())
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
c, resp, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()
fmt.Println(resp.StatusCode)
done := make(chan struct{})

View File

@ -92,6 +92,8 @@ func main() {
PrintAndExit(msg)
}
service.DefaultService.Run()
// 连接到审核任务数据库接口
service.DefaultService.ConnectToImageAudit()
service.DefaultService.ConnectToTextAudit()

View File

@ -1,16 +0,0 @@
package controller
import (
"github.com/gin-gonic/gin"
"service/api/errcode"
"service/app/mix/service"
"service/library/logger"
)
func HandleWs(ctx *gin.Context) {
err := service.DefaultService.GetDefaultMelody().HandleRequest(ctx.Writer, ctx.Request)
if err != nil {
logger.Error("HandleRequest fail, err: %v", err)
ReplyErrCodeMsg(ctx, errcode.ErrCodeHandleWsFail)
}
}

View File

@ -98,6 +98,7 @@ const (
COLZoneMomentStat = "zone_moment_stat"
COLUserIncome = "user_income"
COLWithdrawHis = "withdraw_his"
COLAlipayBalance = "alipay_balance"
COLWithdrawFreeze = "withdraw_freeze"
DBCatalog = "catalog"
@ -219,9 +220,6 @@ const (
DBUserMomentVisitOffset = "user_moment_visit_offset"
COLUserMomentVisitOffset = "user_moment_visit_offset"
DBAccountPunishment = "account_punishment"
COLAccountPunishment = "account_punishment"
DBZoneSession = "zone_session"
COLZoneSession = "zone_session"
@ -336,6 +334,11 @@ func (m *Mongo) getColWithdrawHis() *qmgo.Collection {
return m.clientMix.Database(DBVas).Collection(COLWithdrawHis)
}
// 支付宝余额
func (m *Mongo) getColAlipayBalance() *qmgo.Collection {
return m.clientMix.Database(DBVas).Collection(COLAlipayBalance)
}
// 提现冻结
func (m *Mongo) getColWithdrawFreeze() *qmgo.Collection {
return m.clientMix.Database(DBVas).Collection(COLWithdrawFreeze)

View File

@ -2,9 +2,12 @@ package dao
import (
"errors"
"context"
"github.com/gin-gonic/gin"
"github.com/qiniu/qmgo"
"github.com/qiniu/qmgo/operator"
qoptions "github.com/qiniu/qmgo/options"
"go.mongodb.org/mongo-driver/mongo/options"
vasproto "service/api/proto/vas/proto"
"service/dbstruct"
"time"
@ -484,3 +487,50 @@ func (m *Mongo) SetRefundAudit(ctx *gin.Context, ids []string) error {
return nil
}
// 更新支付宝余额
func (m *Mongo) UpsertAlipayBalance(ctx context.Context, id string, balance int64) error {
col := m.getColAlipayBalance()
up := qmgo.M{
operator.SetOnInsert: qmgo.M{
"ct": time.Now().Unix(),
},
operator.Set: qmgo.M{
"balance": balance,
"withdraw_ut": 0,
},
}
err := col.UpdateId(ctx, id, up, qoptions.UpdateOptions{
UpdateHook: nil,
UpdateOptions: options.Update().SetUpsert(true),
})
return err
}
// 更新ut
func (m *Mongo) UpdateAlipayAccountWithdrawUt(ctx *gin.Context, id string) error {
col := m.getColAlipayBalance()
up := qmgo.M{
operator.Set: qmgo.M{
"withdraw_ut": time.Now().Unix(),
},
}
err := col.UpdateId(ctx, id, up)
return err
}
// 获取提现支付宝
func (m *Mongo) GetAvailableWithdrawAlipayAccount(ctx *gin.Context, withdrawAmount int64) (*dbstruct.AlipayAccount, error) {
doc := new(dbstruct.AlipayAccount)
col := m.getColAlipayBalance()
query := qmgo.M{
"balance": qmgo.M{
operator.Gte: withdrawAmount + 100000,
},
}
err := col.Find(ctx, query).Sort("withdraw_ut").One(&doc)
if err != nil {
return nil, err
}
return doc, err
}

View File

@ -0,0 +1,13 @@
package cron
import "service/app/mix/dao"
type Cron struct {
store *dao.Store
}
func NewCron(s *dao.Store) *Cron {
return &Cron{
store: s,
}
}

View File

@ -0,0 +1,43 @@
package cron
import (
"context"
"service/bizcommon/util"
"service/library/logger"
"service/library/payclients/alipaycli"
"strconv"
"time"
)
func (c *Cron) SyncAlipayBalance() {
logger.Info("SyncAlipayBalance Start")
for range time.Tick(time.Second * 5) {
ctx := context.Background()
allClients := alipaycli.GetAllAlipayClients()
for _, client := range allClients {
// 查询支付宝余额
resp, err := client.QueryBalance(ctx)
if err != nil {
logger.Error("QueryBalance fail, resp: %v, err: %v", util.ToJson(resp), err)
continue
}
if resp.Response == nil {
logger.Error("no QueryBalance resp, err: %v", err)
continue
}
balance, err := strconv.ParseFloat(resp.Response.AvailableAmount, 10)
if err != nil {
logger.Error("ParseFloat fail, resp: %v, err: %v", util.ToJson(resp), err)
continue
}
// 更新支付宝余额
err = c.store.UpsertAlipayBalance(ctx, client.AppId, int64(balance*100.0))
if err != nil {
logger.Error("UpsertAlipayBalance fail, err: %v", err)
continue
}
logger.Info("SyncAlipayBalance Success, id: %v, balance: %v", client.AppId, balance)
}
}
}

View File

@ -62,6 +62,7 @@ import (
"service/apollostruct"
"service/app/mix/conf"
"service/app/mix/dao"
"service/app/mix/service/cron"
"service/app/mix/service/logic"
"service/bizcommon/util"
"service/dbstruct"
@ -70,7 +71,6 @@ import (
"service/library/contentaudit/textaudit"
videomoderation "service/library/contentaudit/video_moderation"
"service/library/logger"
"service/library/melody"
"service/library/mycrypto"
"service/library/payclients/alipaycli"
"service/library/payclients/wxpaycli"
@ -179,7 +179,7 @@ var (
)
type Service struct {
defaultMelody *melody.Melody
cron *cron.Cron
}
func NewService() *Service {
@ -204,8 +204,6 @@ func (s *Service) Init(c any) (err error) {
logger.Error("cryptoService init, err: %v", err)
}
s.defaultMelody = melody.New()
err = alipaycli.InitMulti(cfg.Alipay, cfg.AlipayMYTS, cfg.AlipayLX01, cfg.AlipayLX02)
if err != nil {
logger.Error("alipaycli.Init fail, cfg: %v, err: %v", util.ToJson(cfg.Alipay), err)
@ -220,6 +218,8 @@ func (s *Service) Init(c any) (err error) {
yeepaycli.Init(cfg.Yeepay)
s.cron = cron.NewCron(store)
_DefaultToken = logic.NewToken(store, cfg.Crypto)
_DefaultVeriCode = logic.NewVeriCode(store, DefaultZthyService.SendSms)
_DefaultVeriCodeSendTimes = logic.NewVeriCodeSendTimes(store)
@ -309,15 +309,11 @@ func (s *Service) Init(c any) (err error) {
return
}
func (s *Service) GetDefaultMelody() *melody.Melody {
return s.defaultMelody
}
func (s *Service) Run() {
go s.defaultMelody.HandleMessage(func(session *melody.Session, msg []byte) {
logger.Info("recv msg: %s", string(msg))
s.defaultMelody.Broadcast(msg)
})
go func() {
defer logger.Recover()
s.cron.SyncAlipayBalance()
}()
}
func (s *Service) Stop() {

View File

@ -232,3 +232,10 @@ type RefundInfo struct {
Ct int64 `json:"ct" bson:"ct"`
Ut int64 `json:"ut" bson:"ut"`
}
type AlipayAccount struct {
AppId string `json:"appid" bson:"_id"` // 支付宝商户id
Ct int64 `json:"ct" bson:"ct"` // ct
Balance int64 `json:"balance" bson:"balance"` // 余额,单位: 分
WithdrawUt int64 `json:"withdraw_ut" bson:"withdraw_ut"` // 提现ut
}

View File

@ -39,6 +39,10 @@ func GetAlipayClientByAppId(appId string) *AlipayClient {
return allAlipayClients[appId]
}
func GetAllAlipayClients() map[string]*AlipayClient {
return allAlipayClients
}
type AlipayClient struct {
*alipay.Client
alipayPublicCertPath string
@ -261,3 +265,13 @@ func (c *AlipayClient) RefundOne(ctx context.Context, param *RefundOneParam) (re
}
return
}
func (c *AlipayClient) QueryBalance(ctx context.Context) (resp *alipay.DataBillBalanceQueryResponse, err error) {
bm := gopay.BodyMap{}
resp, err = c.DataBillBalanceQuery(ctx, bm)
logger.Info("DataBillBalanceQuery bm: %v, resp: %v", util.ToJson(bm), util.ToJson(resp))
if err != nil {
return
}
return
}

View File

@ -129,3 +129,11 @@ func TestFillOid3(t *testing.T) {
f.Write(content)
}
}
func TestAlipayClient_QueryBills(t *testing.T) {
cliMYTS := GetAlipayClientByAppId(AppIdMiYuanTianShi)
resp, _ := cliMYTS.QueryBills(context.Background(), nil)
//t.Log(err.Error())
t.Log(util.ToJson(resp.Response))
fmt.Println(resp.Response.TotalAmount)
}