service/app/mix/dao/mysql.go

996 lines
31 KiB
Go

package dao
import (
"database/sql"
"errors"
"fmt"
"service/app/mix/conf"
"service/bizcommon/util"
"service/dbstruct"
"service/library/logger"
"service/library/mysqldb"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/jmoiron/sqlx"
goproto "google.golang.org/protobuf/proto"
)
type Mysql struct {
dbVas *sqlx.DB
dbXxlJob *sqlx.DB
}
func (m *Mysql) getDBVas() *sqlx.DB {
return m.dbVas
}
func (m *Mysql) getDBXxlJob() *sqlx.DB {
return m.dbXxlJob
}
func (m *Mysql) VasBegin(ctx *gin.Context) (tx *sqlx.Tx, err error) {
tx, err = m.dbVas.BeginTxx(ctx, nil)
if err != nil {
logger.Error("beginx fail, err: %v", err)
return
}
if tx == nil {
err = errors.New("get tx fail")
logger.Error("nil tx, err: %v", err)
return
}
return
}
func (m *Mysql) DealTxCR(tx *sqlx.Tx, errIn error) (errOut error) {
if errIn != nil {
// 事务回滚
logger.Error("deal tx err, %v", errIn)
errOut = tx.Rollback()
if errOut != nil {
logger.Error("tx rollback fail, err: %v", errOut)
return
}
} else {
// 提交事务
errOut = tx.Commit()
if errOut != nil {
logger.Error("tx commit fail, err: %v", errOut)
return
}
}
return
}
// mysql
const (
DatabaseVas = "vas"
TableOrder = "vas_order" // 订单表
TableWallet = "vas_wallet" // 钱包
TableCoinOrder = "vas_coin_order" // 金币订单
TableConsumeHistoryCost = "vas_ch_cost" // 消费明细
TableConsumeHistoryCharge = "vas_ch_charge" // 充值明细
TableConsumeHistoryIncome = "vas_ch_income" // 收入明细
TableConsumeHistoryWithdraw = "vas_ch_withdraw" // 提现明细
TableVasUserUnlock = "vas_user_unlock" // 用增解锁
TableWithdrawOrder = "vas_withdraw_order" // 提现订单表
TableWithdrawDiamondsHis = "vas_withdraw_diamonds_his" // 提现金币历史
TableVasUserMembershipUnlock = "vas_user_membership_unlock" // 会员资格解锁
DatabaseXxlJob = "xxl_job"
TableXxlJobLog = "xxl_job_log" // xxl_job日志表
)
func (m *Mysql) ChTableName(ch *dbstruct.ConsumeHistory) (string, error) {
switch ch.GetType() {
case dbstruct.CHTypeCost:
return TableConsumeHistoryCost, nil
case dbstruct.CHTypeCharge:
return TableConsumeHistoryCharge, nil
case dbstruct.CHTypeIncome:
return TableConsumeHistoryIncome, nil
case dbstruct.CHTypeWithdraw:
return TableConsumeHistoryWithdraw, nil
default:
return "", errors.New(fmt.Sprintf("invalid ch type: %v", *ch.Type))
}
}
func NewMysql(cfg *conf.ConfigSt) (mysql *Mysql, err error) {
mysql = new(Mysql)
mysql.dbVas, err = mysqldb.NewMysqlDB(cfg.MixMysql, DatabaseVas)
if err != nil {
logger.Error("NewMysqlDB fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseVas, err)
return
}
mysql.dbVas.SetMaxOpenConns(20)
mysql.dbVas.SetMaxIdleConns(10)
err = mysql.dbVas.Ping()
if err != nil {
logger.Error("MysqlDB ping fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseVas, err)
return
}
mysql.dbXxlJob, err = mysqldb.NewMysqlDB(cfg.MixMysql, DatabaseXxlJob)
if err != nil {
logger.Error("NewMysqlDB fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseXxlJob, err)
return
}
mysql.dbXxlJob.SetMaxOpenConns(20)
mysql.dbXxlJob.SetMaxIdleConns(10)
err = mysql.dbXxlJob.Ping()
if err != nil {
logger.Error("MysqlDB ping fail, cfg: %v, db: %v, err: %v", util.ToJson(cfg.MixMongo), DatabaseXxlJob, err)
return
}
return
}
// 创建订单
func (m *Mysql) CreateOrder(ctx *gin.Context, tx *sqlx.Tx, order *dbstruct.Order) error {
var err error
sqlStr := "insert into " + TableOrder +
" (id, mid, uid, product_id, pay_type, pay_amount, oid1, oid2, oid3, " +
" out_order_id, receipt_data, coins, order_status, order_from, " +
" ct, ut, ext, b_did, b_ver, b_osver, b_dt, b_ch, b_model, b_nt, ip) " +
" values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
order.GetID(), order.GetMid(), order.GetUid(), order.GetProductId(), order.GetPayType(), order.GetPayAmount(), order.GetOid1(), order.GetOid2(), order.GetOid3(),
order.GetOutOrderID(), order.GetReceiptData(), order.GetCoins(), order.GetOrderStatus(), order.GetOrderFrom(),
order.GetCt(), order.GetUt(), order.GetExt(), order.GetDid(), order.GetVersion(), order.GetOsVersion(), order.GetDevType(), order.GetChannel(), order.GetModel(), order.GetNetType(), order.GetIp(),
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
order.GetID(), order.GetMid(), order.GetUid(), order.GetProductId(), order.GetPayType(), order.GetPayAmount(), order.GetOid1(), order.GetOid2(), order.GetOid3(),
order.GetOutOrderID(), order.GetReceiptData(), order.GetCoins(), order.GetOrderStatus(), order.GetOrderFrom(),
order.GetCt(), order.GetUt(), order.GetExt(), order.GetDid(), order.GetVersion(), order.GetOsVersion(), order.GetDevType(), order.GetChannel(), order.GetModel(), order.GetNetType(), order.GetIp(),
)
}
if err != nil {
logger.Error("CreateOrder fail, order: %v, err: %v", order.ToString(), err)
return err
}
return err
}
// 获取订单
func (m *Mysql) GetOrderById(ctx *gin.Context, tx *sqlx.Tx, id string) (order *dbstruct.Order, err error) {
var tmpOrder dbstruct.Order
if tx != nil {
err = tx.GetContext(ctx, &tmpOrder, fmt.Sprintf("select * from %s where id = ?", TableOrder), id)
} else {
db := m.getDBVas()
err = db.GetContext(ctx, &tmpOrder, fmt.Sprintf("select * from %s where id = ?", TableOrder), id)
}
if err != nil {
return
}
order = &tmpOrder
return
}
// 获取订单
func (m *Mysql) GetOrderByOutOrderId(ctx *gin.Context, tx *sqlx.Tx, outOrderId string) (order *dbstruct.Order, err error) {
var tmpOrder dbstruct.Order
if tx != nil {
err = tx.GetContext(ctx, &tmpOrder, fmt.Sprintf("select * from %s where out_order_id = ?", TableOrder), outOrderId)
} else {
db := m.getDBVas()
err = db.GetContext(ctx, &tmpOrder, fmt.Sprintf("select * from %s where out_order_id = ?", TableOrder), outOrderId)
}
if err != nil {
return
}
order = &tmpOrder
return
}
// 获取订单for update
func (m *Mysql) GetOrderByIdForUpdate(ctx *gin.Context, tx *sqlx.Tx, id string) (order *dbstruct.Order, err error) {
var tmpOrder dbstruct.Order
err = tx.GetContext(ctx, &tmpOrder, fmt.Sprintf("select * from %s where id = ? for update", TableOrder), id)
if err != nil {
return
}
order = &tmpOrder
return
}
// op 获取订单
func (m *Mysql) GetOrdersByMid(ctx *gin.Context, tx *sqlx.Tx, mid int64, offset, limit int, status int32) (list []*dbstruct.Order, err error) {
list = make([]*dbstruct.Order, 0)
sqlStr := fmt.Sprintf("select * from %s where mid=? limit ? offset ?", TableOrder)
if status > dbstruct.VasOrderStatusNone {
sqlStr = fmt.Sprintf("select * from %s where mid=? and status=%d limit ? offset ?", TableOrder, status)
}
if tx != nil {
err = tx.SelectContext(ctx, &list, sqlStr, mid, limit, offset)
} else {
db := m.getDBVas()
err = db.SelectContext(ctx, &list, sqlStr, mid, limit, offset)
}
if err == sql.ErrNoRows {
err = nil
return
}
if err != nil {
return
}
return
}
// 更新订单状态
func (m *Mysql) UpdateOrderStatus(ctx *gin.Context, tx *sqlx.Tx, orderId string, preStatus, aftStatus int32) error {
var err error
sqlStr := "update " + TableOrder + " set order_status=? where id=? and order_status=?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr, aftStatus, orderId, preStatus)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr, aftStatus, orderId, preStatus)
}
if err != nil {
logger.Error("UpdateOrderStatus fail, orderId: %v, preStatus: %v, aftStatus: %v, err: %v", orderId, preStatus, aftStatus, err)
return err
}
return err
}
// 更新订单out_order_id
func (m *Mysql) UpdateOutOrderId(ctx *gin.Context, tx *sqlx.Tx, orderId string, outOrderId string) error {
var err error
sqlStr := "update " + TableOrder + " set out_order_id=? where id=?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr, outOrderId, orderId)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr, outOrderId, orderId)
}
if err != nil {
logger.Error("UpdateOutOrderId fail, orderId: %v, outOrderId: %v, err: %v", orderId, outOrderId, err)
return err
}
return err
}
// 获取钱包 for update
func (m *Mysql) GetWalletForUpdate(ctx *gin.Context, tx *sqlx.Tx, mid int64) (wallet *dbstruct.Wallet, err error) {
var tmpWallet dbstruct.Wallet
err = tx.GetContext(ctx, &tmpWallet, fmt.Sprintf("select * from %s where id = ? for update", TableWallet), mid)
if err != nil {
return
}
wallet = &tmpWallet
return
}
// 新建钱包
func (m *Mysql) CreateWallet(ctx *gin.Context, tx *sqlx.Tx, mid int64) error {
var err error
sqlStr := "insert into " + TableWallet +
" (id, coins, diamonds, withdraw_diamonds) " +
" values (?, ?, ?, ?)"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr, mid, 0, 0, 0)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr, mid, 0, 0, 0)
}
if err != nil {
logger.Error("CreateWallet fail, mid: %v, err: %v", mid, err)
return err
}
return nil
}
// 获取钱包
func (m *Mysql) GetWalletByMid(ctx *gin.Context, mid int64) (wallet *dbstruct.Wallet, err error) {
var (
db = m.getDBVas()
tmpWallet dbstruct.Wallet
)
err = db.GetContext(ctx, &tmpWallet, fmt.Sprintf("select * from %s where id = ?", TableWallet), mid)
if err != nil {
return
}
wallet = &tmpWallet
return
}
// 增加金币
func (m *Mysql) IncCoins(ctx *gin.Context, tx *sqlx.Tx, mid, coins int64) error {
var err error
sqlStr := "update " + TableWallet + " set coins = coins + ? where id = ?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
coins, mid,
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
coins, mid,
)
}
if err != nil {
logger.Error("IncCoins fail, mid: %v, coins: %v, err: %v", mid, coins, err)
return err
}
return err
}
// 扣金币
func (m *Mysql) DecCoins(ctx *gin.Context, tx *sqlx.Tx, mid, coins int64) error {
var err error
sqlStr := "update " + TableWallet + " set coins = coins - ? where id = ?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
coins, mid,
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
coins, mid,
)
}
if err != nil {
logger.Error("IncCoins fail, mid: %v, coins: %v, err: %v", mid, coins, err)
return err
}
return err
}
// 增加钻石
func (m *Mysql) IncDiamonds(ctx *gin.Context, tx *sqlx.Tx, mid, dias int64) error {
var err error
sqlStr := "update " + TableWallet + " set diamonds = diamonds + ? where id = ?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
dias, mid,
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
dias, mid,
)
}
if err != nil {
logger.Error("IncCoins fail, mid: %v, dias: %v, err: %v", mid, dias, err)
return err
}
return err
}
// 增加提现钻石
func (m *Mysql) IncWithdrawDiamonds(ctx *gin.Context, tx *sqlx.Tx, mid, dias int64) error {
var err error
sqlStr := "update " + TableWallet + " set withdraw_diamonds=withdraw_diamonds+? where id=?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr, dias, mid)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr, dias, mid)
}
if err != nil {
logger.Error("IncWithdrawDiamonds fail, mid: %v, dias: %v, err: %v", mid, dias, err)
return err
}
return err
}
// 扣提现钻石
func (m *Mysql) DecWithdrawDiamonds(ctx *gin.Context, tx *sqlx.Tx, mid, dias int64) error {
var err error
sqlStr := "update " + TableWallet + " set withdraw_diamonds=withdraw_diamonds-? where id=?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr, dias, mid)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr, dias, mid)
}
if err != nil {
logger.Error("DecWithdrawDiamonds fail, mid: %v, dias: %v, err: %v", mid, dias, err)
return err
}
return err
}
// 创建金币订单
func (m *Mysql) CreateCoinOrder(ctx *gin.Context, tx *sqlx.Tx, order *dbstruct.CoinOrder) error {
var err error
sqlStr := "insert into " + TableCoinOrder +
" (id, mid, uid, product_id, " +
" coins, order_status, order_from, " +
" ct, ut, ext, b_did, b_ver, b_osver, " +
" b_dt, b_ch, b_model, b_nt, ip) " +
" values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
order.GetID(), order.GetMid(), order.GetUid(), order.GetProductId(),
order.GetCoins(), order.GetOrderStatus(), order.GetOrderFrom(),
order.GetCt(), order.GetUt(), order.GetExt(), order.GetDid(), order.GetVersion(), order.GetOsVersion(),
order.GetDevType(), order.GetChannel(), order.GetModel(), order.GetNetType(), order.GetIp(),
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
order.GetID(), order.GetMid(), order.GetUid(), order.GetProductId(),
order.GetCoins(), order.GetOrderStatus(), order.GetOrderFrom(),
order.GetCt(), order.GetUt(), order.GetExt(), order.GetDid(), order.GetVersion(), order.GetOsVersion(),
order.GetDevType(), order.GetChannel(), order.GetModel(), order.GetNetType(), order.GetIp(),
)
}
if err != nil {
logger.Error("CreateCoinOrder fail, order: %v, err: %v", order.ToString(), err)
return err
}
return err
}
// 获取金币订单
func (m *Mysql) GetCoinOrderById(ctx *gin.Context, tx *sqlx.Tx, id string) (order *dbstruct.CoinOrder, err error) {
var tmpOrder dbstruct.CoinOrder
if tx != nil {
err = tx.GetContext(ctx, &tmpOrder, fmt.Sprintf("select * from %s where id = ?", TableCoinOrder), id)
} else {
db := m.getDBVas()
err = db.GetContext(ctx, &tmpOrder, fmt.Sprintf("select * from %s where id = ?", TableCoinOrder), id)
}
if err != nil {
return
}
order = &tmpOrder
return
}
// 获取金币订单
func (m *Mysql) GetCoinOrders(ctx *gin.Context, tx *sqlx.Tx, mid, st, et int64) (list []*dbstruct.CoinOrder, err error) {
list = make([]*dbstruct.CoinOrder, 0)
sqlStr := fmt.Sprintf("select * from %s where ct>=%d and ct<%d", TableCoinOrder, st, et)
if mid > 0 {
sqlStr = fmt.Sprintf("select * from %s where mid=%d", TableCoinOrder, mid)
}
if tx != nil {
err = tx.SelectContext(ctx, &list, sqlStr)
} else {
db := m.getDBVas()
err = db.SelectContext(ctx, &list, sqlStr)
}
if err != nil {
return
}
return
}
// 更新订单状态
func (m *Mysql) UpdateCoinOrderStatus(ctx *gin.Context, tx *sqlx.Tx, orderId string, status int32) error {
var err error
sqlStr := "update " + TableCoinOrder + " set order_status=?,ut=? where id=?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr, status, time.Now().Unix(), orderId)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr, status, time.Now().Unix(), orderId)
}
if err != nil {
logger.Error("UpdateCoinOrderStatus fail, orderId: %v, status: %v, err: %v", orderId, status, err)
return err
}
return err
}
// 获取金币订单 for update
func (m *Mysql) GetCoinOrderByIdForUpdate(ctx *gin.Context, tx *sqlx.Tx, id string) (order *dbstruct.CoinOrder, err error) {
var tmpOrder dbstruct.CoinOrder
err = tx.GetContext(ctx, &tmpOrder, fmt.Sprintf("select * from %s where id = ? for update", TableCoinOrder), id)
if err != nil {
return
}
order = &tmpOrder
return
}
// 金币订单填写联系方式
func (m *Mysql) FillCoinOderContact(ctx *gin.Context, tx *sqlx.Tx, id, qq, wechat, phone, addr, note string) error {
var err error
sqlStr := "update " + TableCoinOrder + " set qq=?, wechat=?, phone=?, addr=?, note=?, order_status=? where id=?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
qq, wechat, phone, addr, note, dbstruct.VasCoinOrderStatusWaitDeal, id,
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
qq, wechat, phone, addr, note, dbstruct.VasCoinOrderStatusWaitDeal, id,
)
}
if err != nil {
return err
}
return nil
}
// todo 获取待结算的订单
func (m *Mysql) Get(ctx *gin.Context, tx *sqlx.Tx, id string) (order *dbstruct.CoinOrder, err error) {
var tmpOrder dbstruct.CoinOrder
err = tx.GetContext(ctx, &tmpOrder, fmt.Sprintf("select * from %s where id = ? for update", TableCoinOrder), id)
if err != nil {
return
}
order = &tmpOrder
return
}
// 待添加微信的订单
func (m *Mysql) GetWaitWechatAddCoinOrders(ctx *gin.Context, tx *sqlx.Tx, mid int64, statusList []int32, offset, limit int) (list []*dbstruct.CoinOrder, err error) {
list = make([]*dbstruct.CoinOrder, 0)
sqlStr := fmt.Sprintf("select * from %s where uid=? and product_id=? and order_status in (%s) limit ? offset ?", TableCoinOrder, util.Convert2SqlArr(statusList))
if tx != nil {
err = tx.SelectContext(ctx, &list, sqlStr, mid, dbstruct.ProductIdContactWechat, limit, offset)
} else {
db := m.getDBVas()
err = db.SelectContext(ctx, &list, sqlStr, mid, dbstruct.ProductIdContactWechat, limit, offset)
}
if err != nil {
return
}
return
}
// 确定推荐微信
func (m *Mysql) ConfirmAddWechat(ctx *gin.Context, tx *sqlx.Tx, coinOrderId string) (err error) {
sqlStr := "update " + TableCoinOrder + " set order_status=? where id=? and order_status=?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr, dbstruct.VasCoinOrderStatusDeal, coinOrderId, dbstruct.VasCoinOrderStatusWaitDeal)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr, dbstruct.VasCoinOrderStatusDeal, coinOrderId, dbstruct.VasCoinOrderStatusWaitDeal)
}
if err != nil {
return
}
return
}
// 创建消费历史
func (m *Mysql) CreateConsumeHistory(ctx *gin.Context, tx *sqlx.Tx, ch *dbstruct.ConsumeHistory) error {
var (
timeNow = time.Now().Unix()
tableName, err = m.ChTableName(ch)
)
if err != nil {
return err
}
sqlStr := "insert into " + tableName +
" (mid, uid, did, `type`, stype, type_id, " +
" `order_id`, `change`, `before`, `after`, `count`, ct) " +
" values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
ch.Mid, ch.Uid, ch.Did, ch.Type, ch.SType, ch.TypeId,
ch.OrderId, ch.Change, ch.Before, ch.After, ch.Count, timeNow,
//ch.GetMid(), ch.GetUid(), ch.GetDid(), ch.GetType(), ch.GetSType(), ch.GetTypeId(),
//ch.GetOrderId(), ch.GetChange(), ch.GetBefore(), ch.GetAfter(), ch.GetCount(), timeNow,
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
ch.Mid, ch.Uid, ch.Did, ch.Type, ch.SType, ch.TypeId,
ch.OrderId, ch.Change, ch.Before, ch.After, ch.Count, timeNow,
)
}
if err != nil {
logger.Error("CreateConsumeHistory fail, ch: %v, err: %v", util.ToJson(ch), err)
return err
}
return err
}
// 创建解锁记录
func (m *Mysql) CreateUserVasUnlock(ctx *gin.Context, tx *sqlx.Tx, uu *dbstruct.UserVasUnlock) error {
var (
err error
timeNow = time.Now().Unix()
)
sqlStr := "insert into " + TableVasUserUnlock +
" (mid, uid, product_id, ct, lock_type, status, means, order_id) " +
" values (?, ?, ?, ?, ?, ?, ?, ?) "
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
uu.Mid, uu.Uid, uu.ProductId, timeNow, uu.LockType, uu.Status, uu.Means, uu.OrderId,
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
uu.Mid, uu.Uid, uu.ProductId, timeNow, uu.LockType, uu.Status, uu.Means, uu.OrderId,
)
}
if err != nil {
logger.Error("CreateCoinOrder fail, uu: %v, err: %v", util.ToJson(uu), err)
return err
}
return err
}
// 获取解锁记录
func (m *Mysql) GetUserVasUnlock(ctx *gin.Context, tx *sqlx.Tx, mid, uid int64, productId string) (uu *dbstruct.UserVasUnlock, err error) {
var uuTmp dbstruct.UserVasUnlock
sqlStr := fmt.Sprintf("select * from %s where mid=? and uid=? and product_id=?", TableVasUserUnlock)
fmt.Println(sqlStr)
if tx != nil {
err = tx.GetContext(ctx, &uuTmp, sqlStr, mid, uid, productId)
} else {
db := m.getDBVas()
err = db.GetContext(ctx, &uuTmp, sqlStr, mid, uid, productId)
}
if err != nil {
return
}
uu = &uuTmp
return
}
// 创建解锁记录
func (m *Mysql) CreateUserVasMembershipUnlock(ctx *gin.Context, tx *sqlx.Tx, uu *dbstruct.UserVasMembershipUnlock) error {
var (
err error
timeNow = time.Now().Unix()
)
sqlStr := "insert into " + TableVasUserMembershipUnlock +
" (mid, product_id, ct, means, order_id) " +
" values (?, ?, ?, ?, ?) "
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
uu.Mid, uu.ProductId, timeNow, uu.Means, uu.OrderId,
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
uu.Mid, uu.ProductId, timeNow, uu.Means, uu.OrderId,
)
}
if err != nil {
logger.Error("CreateUserVasMembershipUnlock fail, uu: %v, err: %v", util.ToJson(uu), err)
return err
}
return err
}
// 获取会员资格解锁记录
func (m *Mysql) GetUserVasMembershipUnlock(ctx *gin.Context, tx *sqlx.Tx, mid int64, productId string) (uu *dbstruct.UserVasMembershipUnlock, err error) {
var uuTmp dbstruct.UserVasMembershipUnlock
sqlStr := fmt.Sprintf("select * from %s where mid=? and product_id=?", TableVasUserMembershipUnlock)
fmt.Println(sqlStr)
if tx != nil {
err = tx.GetContext(ctx, &uuTmp, sqlStr, mid, productId)
} else {
db := m.getDBVas()
err = db.GetContext(ctx, &uuTmp, sqlStr, mid, productId)
}
if err != nil {
return
}
uu = &uuTmp
return
}
// 获取解锁记录
func (m *Mysql) GetUnlockWechatList(ctx *gin.Context, tx *sqlx.Tx, mid int64, offset, limit int) (list []*dbstruct.UserVasUnlock, err error) {
list = make([]*dbstruct.UserVasUnlock, 0)
sqlStr := fmt.Sprintf("select * from %s where mid=? limit ? offset ? ", TableVasUserUnlock)
if tx != nil {
err = tx.SelectContext(ctx, &list, sqlStr, mid, limit, offset)
} else {
db := m.getDBVas()
err = db.SelectContext(ctx, &list, sqlStr, mid, limit, offset)
}
if err != nil {
return
}
return
}
// 获取消费历史
func (m *Mysql) GetUCHList(ctx *gin.Context, tx *sqlx.Tx, mid int64, typ int32, offset, limit int) (list []*dbstruct.ConsumeHistory, err error) {
list = make([]*dbstruct.ConsumeHistory, 0)
tableName, err := m.ChTableName(&dbstruct.ConsumeHistory{Type: goproto.Int32(typ)})
sqlStr := fmt.Sprintf("select * from %s where mid=? limit ? offset ?", tableName)
if tx != nil {
err = tx.SelectContext(ctx, &list, sqlStr, mid, limit, offset)
} else {
db := m.getDBVas()
err = db.SelectContext(ctx, &list, sqlStr, mid, limit, offset)
}
if err != nil {
return
}
return
}
// 获取收入历史
func (m *Mysql) GetIncomeCHList(ctx *gin.Context, tx *sqlx.Tx, orderId string) (list []*dbstruct.ConsumeHistory, err error) {
list = make([]*dbstruct.ConsumeHistory, 0)
tableName, err := m.ChTableName(&dbstruct.ConsumeHistory{Type: goproto.Int32(dbstruct.CHTypeIncome)})
sqlStr := fmt.Sprintf("select * from %s where order_id=?", tableName)
if tx != nil {
err = tx.SelectContext(ctx, &list, sqlStr, orderId)
} else {
db := m.getDBVas()
err = db.SelectContext(ctx, &list, sqlStr, orderId)
}
if err != nil {
return
}
return
}
// 获取订单
func (m *Mysql) GetOrderCountGroupByStatus(ctx *gin.Context, tx *sqlx.Tx, orderStatuses []int32, ctStart *int64, ctEnd *int64) (list []*dbstruct.VasOrderStatusCount, err error) {
var sql strings.Builder
var args []interface{}
sql.WriteString(fmt.Sprintf("select order_status, count(1) as count from %s where 1 = 1", TableOrder))
if ctStart != nil {
sql.WriteString(" and ct >= ?")
args = append(args, util.DerefInt64(ctStart))
}
if ctEnd != nil {
sql.WriteString(" and ct <= ?")
args = append(args, util.DerefInt64(ctEnd))
}
if len(orderStatuses) > 0 {
inClause := strings.Builder{}
inClause.WriteString("?")
args = append(args, orderStatuses[0])
for i := 1; i < len(orderStatuses); i++ {
inClause.WriteString(",?")
args = append(args, orderStatuses[i])
}
sql.WriteString(fmt.Sprintf(" and order_status in (%s)", inClause.String()))
}
sql.WriteString(" group by order_status")
list = make([]*dbstruct.VasOrderStatusCount, 0)
if tx != nil {
err = tx.SelectContext(ctx, &list, sql.String(), args...)
} else {
db := m.getDBVas()
err = db.SelectContext(ctx, &list, sql.String(), args...)
}
if err != nil {
return
}
return
}
// 创建提现订单
func (m *Mysql) CreateWithdrawOrder(ctx *gin.Context, tx *sqlx.Tx, wOrder *dbstruct.WithdrawOrder) error {
var err error
sqlStr := "insert into " + TableWithdrawOrder +
" (id, mid, did, apply_time, alipay_id, alipay_name, " +
" withdraw_dias, withdraw_money, ip, order_status, operator, op_time) " +
" values (?,?,?,?,?,?,?,?,?,?,?,?) "
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
wOrder.GetID(), wOrder.GetMid(), wOrder.GetDid(), wOrder.GetApplyTime(), wOrder.GetAlipayId(), wOrder.GetAlipayName(),
wOrder.GetWithdrawDias(), wOrder.GetWithdrawMoney(), wOrder.GetIp(), wOrder.GetOrderStatus(), wOrder.GetOperator(), wOrder.GetOpTime(),
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
wOrder.GetID(), wOrder.GetMid(), wOrder.GetApplyTime(), wOrder.GetDid(), wOrder.GetAlipayId(), wOrder.GetAlipayName(),
wOrder.GetWithdrawDias(), wOrder.GetWithdrawMoney(), wOrder.GetIp(), wOrder.GetOrderStatus(), wOrder.GetOperator(), wOrder.GetOpTime(),
)
}
if err != nil {
logger.Error("CreateOrder fail, wOrder: %v, err: %v", wOrder.ToString(), err)
return err
}
return err
}
// 更新提现订单状态
func (m *Mysql) UpdateWithdrawOrderStatus(ctx *gin.Context, tx *sqlx.Tx, orderId string, preStatus, aftStatus int32) error {
var err error
sqlStr := "update " + TableWithdrawOrder + " set order_status=? where id=? and order_status=?"
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr, aftStatus, orderId, preStatus)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr, aftStatus, orderId, preStatus)
}
if err != nil {
logger.Error("UpdateWithdrawOrderStatus fail, orderId: %v, preStatus: %v, aftStatus: %v, err: %v", orderId, preStatus, aftStatus, err)
return err
}
return err
}
// 获取提现订单
func (m *Mysql) GetWithdrawOrdersByMid(ctx *gin.Context, tx *sqlx.Tx, mid, st, et int64) (list []*dbstruct.WithdrawOrder, err error) {
list = make([]*dbstruct.WithdrawOrder, 0)
sqlStr := fmt.Sprintf("select * from %s where mid=? and apply_time>=? and apply_time<?", TableWithdrawOrder)
if tx != nil {
err = tx.SelectContext(ctx, &list, sqlStr, mid, st, et)
} else {
db := m.getDBVas()
err = db.SelectContext(ctx, &list, sqlStr, mid, st, et)
}
if err == sql.ErrNoRows {
err = nil
return
}
if err != nil {
return
}
return
}
// 获取指定任务中所有已经执行成功的xxl_job任务
func (m *Mysql) GetSuccessXxlJobLogs(ctx *gin.Context, tx *sqlx.Tx, jobIdsStr string, errorPrefix string) (list []*dbstruct.XxlJobLog, err error) {
list = make([]*dbstruct.XxlJobLog, 0)
sqlStr := fmt.Sprintf("select * from %s where job_id in (%s) and trigger_code = 200 and handle_code = 200 and trigger_msg not like ?", TableXxlJobLog, jobIdsStr)
if tx != nil {
err = tx.SelectContext(ctx, &list, sqlStr, "%"+errorPrefix)
} else {
db := m.getDBXxlJob()
err = db.SelectContext(ctx, &list, sqlStr, "%"+errorPrefix)
}
if err == sql.ErrNoRows {
err = nil
return
}
if err != nil {
return
}
return
}
// 更新指定任务的任务信息
func (m *Mysql) UpdateXxlJobLog(ctx *gin.Context, tx *sqlx.Tx, xxlJobLog *dbstruct.XxlJobLog) (err error) {
setClauses := make([]string, 0)
setValue := make([]any, 0)
if xxlJobLog.JobGroup != nil {
setClauses = append(setClauses, " job_group = ? ")
setValue = append(setValue, xxlJobLog.GetJobGroup())
}
if xxlJobLog.JobId != nil {
setClauses = append(setClauses, " job_id = ? ")
setValue = append(setValue, xxlJobLog.GetJobId())
}
if xxlJobLog.ExecutorAddress != nil {
setClauses = append(setClauses, " executor_address = ? ")
setValue = append(setValue, xxlJobLog.GetExecutorAddress())
}
if xxlJobLog.ExecutorHandler != nil {
setClauses = append(setClauses, " executor_handler = ? ")
setValue = append(setValue, xxlJobLog.GetExecutorHandler())
}
if xxlJobLog.ExecutorParam != nil {
setClauses = append(setClauses, " executor_param = ? ")
setValue = append(setValue, xxlJobLog.GetExecutorParam())
}
if xxlJobLog.ExecutorShardingParam != nil {
setClauses = append(setClauses, " executor_sharding_param = ? ")
setValue = append(setValue, xxlJobLog.GetExecutorShardingParam())
}
if xxlJobLog.ExecutorFailRetryCount != nil {
setClauses = append(setClauses, " executor_fail_retry_count = ? ")
setValue = append(setValue, xxlJobLog.GetExecutorFailRetryCount())
}
if xxlJobLog.TriggerTime != nil {
setClauses = append(setClauses, " trigger_time = ? ")
setValue = append(setValue, xxlJobLog.GetTriggerTime())
}
if xxlJobLog.TriggerCode != nil {
setClauses = append(setClauses, " trigger_code = ? ")
setValue = append(setValue, xxlJobLog.GetTriggerCode())
}
if xxlJobLog.TriggerMsg != nil {
setClauses = append(setClauses, " trigger_msg = ? ")
setValue = append(setValue, xxlJobLog.GetTriggerMsg())
}
if xxlJobLog.HandleTime != nil {
setClauses = append(setClauses, " handle_time = ? ")
setValue = append(setValue, xxlJobLog.GetHandleTime())
}
if xxlJobLog.HandleCode != nil {
setClauses = append(setClauses, " handle_code = ? ")
setValue = append(setValue, xxlJobLog.GetHandleCode())
}
if xxlJobLog.HandleMsg != nil {
setClauses = append(setClauses, " handle_msg = ? ")
setValue = append(setValue, xxlJobLog.GetHandleMsg())
}
if xxlJobLog.AlarmStatus != nil {
setClauses = append(setClauses, " alarm_status = ? ")
setValue = append(setValue, xxlJobLog.GetAlarmStatus())
}
if len(setValue) == 0 {
return
}
setValue = append(setValue, xxlJobLog.GetId())
setClause := &strings.Builder{}
for i, clause := range setClauses {
if i > 0 {
setClause.WriteString(",")
}
setClause.WriteString(clause)
}
sqlStr := fmt.Sprintf("update %s set %s where id = ?", TableXxlJobLog, setClause)
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr, setValue...)
} else {
db := m.getDBXxlJob()
_, err = db.ExecContext(ctx, sqlStr, setValue...)
}
if err == sql.ErrNoRows {
err = nil
return
}
if err != nil {
return
}
return
}
// 删除指定任务中所有已经执行成功的xxl_job任务
func (m *Mysql) DeleteSuccessXxlJobLogs(ctx *gin.Context, tx *sqlx.Tx, ids []int64) (err error) {
if len(ids) == 0 {
return
}
sqlStr := fmt.Sprintf("delete from %s where id in (%s)", TableXxlJobLog, util.Convert2SqlArr(ids))
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr)
} else {
db := m.getDBXxlJob()
_, err = db.ExecContext(ctx, sqlStr)
}
if err == sql.ErrNoRows {
err = nil
return
}
if err != nil {
return
}
return
}
// 创建提现订单
func (m *Mysql) CreateWithdrawDiamondsHis(ctx *gin.Context, tx *sqlx.Tx, h *dbstruct.WithdrawDiamondsHis) error {
var err error
sqlStr := "insert into " + TableWithdrawDiamondsHis +
" (mid, income_ch_id, order_id, ct, before_withdraw_diamonds, after_withdraw_diamonds, `change`, product_id) " +
" values (?,?,?,?,?,?,?,?) "
if tx != nil {
_, err = tx.ExecContext(ctx, sqlStr,
h.GetMid(), h.GetIncomeChId(), h.GetOrderId(), h.GetCt(), h.GetBeforeWithdrawDiamonds(), h.GetAfterWithdrawDiamonds(), h.GetChange(), h.GetProductId(),
)
} else {
db := m.getDBVas()
_, err = db.ExecContext(ctx, sqlStr,
h.GetMid(), h.GetIncomeChId(), h.GetOrderId(), h.GetCt(), h.GetBeforeWithdrawDiamonds(), h.GetAfterWithdrawDiamonds(), h.GetChange(), h.GetProductId(),
)
}
if err != nil {
logger.Error("CreateWithdrawDiamondsHis fail, h: %v, err: %v", util.ToJson(h), err)
return err
}
return err
}