StakeholderIncomeSyncCommand.php 7.1 KB
<?php

namespace App\Console\Commands;

use App\Helper\CacheKeyTools;
use App\Helper\RedisClient;
use App\Models\Legal\Bills;
use App\Models\Legal\Company;
use App\Models\Legal\StakeholderIncomeSyncApp;
use App\Models\Legal\StakeholderIncomeSyncAppDetails;
use App\Models\Legal\StakeholderIncomeSyncAppLogs;
use App\Models\Legal\Subject;
use App\Services\ApiService;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;

/**
 * 合作伙同数据同步
 * Class StakeholderIncomeSyncCommand
 * @package App\Console\Commands
 */
class StakeholderIncomeSyncCommand extends Command
{
    use \App\Traits\Bills;

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'stakeholder:income:sync';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = '权益人收益同步';

    //常量定义
    const COUNT = 1;
    const BLOCK_TIME = 1000;

    const TYPE_BILLS = "1001601"; //账单分成

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return int
     */
    public function handle()
    {
        $id = '0';

        while (true) {

            $redis = RedisClient::instance('bills');
            $key   = CacheKeyTools::billsSettleByNo();

            //处理消息体 - 每次处理1个入账的数据
            if($msg = $redis->xread([$key=>$id], self::COUNT, self::BLOCK_TIME)) {

                $id           =     key($msg[$key]);
                $task         =     $msg[$key][$id];
                //$batch_no   =     $task['batch_no'];
                $serial_no    =     $task['serial_no'];
                $http_res     =     [];

                Log::channel('api')->warning(__METHOD__."stream-id:{$key}:{$id}-即将处理任务", ['task'=>$task]);

                //获取记录
                if ($http_data = $this->resolveSyncData($serial_no)) {

                    //http
                    $http_res = ApiService::walletAddIncome($http_data);

                    Log::channel('api')->info(__METHOD__."同步反馈", ['stream-id'=>$id, 'serial_no'=>$serial_no, 'request'=>$http_data, 'response'=>$http_res]);

                    //处理返回体
                    $this->dealResponse($serial_no, $http_data, $http_res);
                } else {
                    //此处需要加入 重试计数移除任务机制
                    Log::channel('api')->info(__METHOD__, ['msg'=>'未找到同步记录', 'serial_no'=>$serial_no]);
                }

                //标记处理
                $this->consumeTask($redis, $http_res, $key, $id);

                //销毁变量
                unset($task, $serial_no, $http_res, $http_data);
            }

            unset($redis, $key);

        } //while (true) {
    }

    /**
     * 处理返回体
     * @param $serial_no
     * @param $http_res
     */
    private function dealResponse($serial_no, $request, $response)
    {
        DB::beginTransaction();

        try {

            StakeholderIncomeSyncAppLogs::query()->create([
                'serial_no'     =>  $serial_no,
                'request_log'   =>  $request,
                'response_log'  =>  $response
            ]);

            if (!empty($response)) {
                StakeholderIncomeSyncApp::query()->where(['serial_no'=>$serial_no])->update([
                    'busi_id'     =>    ($response['code'] == 0 ) ? $response['id'] : null,
                    'sync_status' =>    ($response['code'] == 0 ) ? 1 : 2,
                ]);
            }

            DB::commit();

        } catch (\Exception $e) {
            DB::rollBack();
            Log::channel('api')->error(__METHOD__, ['msg'=>$e->getMessage(), 'request'=>$request, 'response'=>$response]);
        }
    }

    /**
     * 将任务id标记为删除
     * @param $redis
     * @param $http_res
     * @param $key
     * @param $id
     */
    private function consumeTask($redis, $http_res, $key, $id)
    {
        if (empty($http_res)) {
            $redis->xdel($key, [$id]);
        } else {
            switch (intval($http_res['code'])) {
                case 0:  //同步成功
                case 501://入账类型不支持,请查阅入账类型字典10016
                case 502://该用户或机构在分贝平台不存在,入账失败
                case 503://入账金额计算有误,总金额=可提现金额+税费
                case 504://请勿重复入账
                case 506://子账单汇总金额与一级账单总金额不一致
                case 507://子账单付款公司与一级账单付款公司不一致
                    $redis->xdel($key, [$id]);
                    break;
                case 500: //无访问权限
                    break;
            }
        }
    }

    /**
     * 生成http请求数据
     * @param $serial_no
     * @return array
     */
    private function resolveSyncData($serial_no)
    {
        if ($income      = StakeholderIncomeSyncApp::query()->where(['serial_no'=>$serial_no, 'sync_status'=>0])->first()) {
            $income_detail  =   StakeholderIncomeSyncAppDetails::query()->where(['track_serial_no'=>$serial_no])->orderBy('month')->get();

            if ($income_detail->isNotEmpty()) {
                return $this->formatHttpBody($income, $income_detail);
            }
        }

        return [];
    }

    /**
     * @param $income
     * @param $income_detail
     * @return array
     */
    private function formatHttpBody($income, $income_detail)
    {
        $subject = Subject::query()->where(['company_id'=>$income->company_id, 'no'=>$income->subject_no])->first();
        $bills   = Bills::query()->find($income->related_id);

        $body = [
            'busiId'        =>  $income->serial_no,
            'cardNo'        =>  $income->identifier,
            'cost'          =>  $income->cost_amount,
            'deductAmount'  =>  $income->deduct_prepaid,
            'faxMoney'      =>  $income->fax_money,
            'money'         =>  $income->money,
            'totalMoney'    =>  $income->total_money,
            'type'          =>  self::TYPE_BILLS,
            'title'         =>  $this->billsTitle($bills),//账单标题
            'paymentCompany'=>  $subject->receipt_name,  //付款公司营业执照全称
            'addLevelTwoIncomeRequests' => [],
        ];

        foreach ($income_detail as $item) {
            $body['addLevelTwoIncomeRequests'][] = [
                'actualTime' => $item->created_at,
                'busiId'     => $item->serial_no,
                'faxMoney'   => $item->fax_money,
                'money'      => $item->money,
                'totalMoney' => $item->total_money,
                'paymentCompany' => $subject->receipt_name,  //付款公司营业执照全称
                'songNum'    => $item->song_num,
                'title'      => $this->billsSubTitle($item->month),
                'type'       => self::TYPE_BILLS,
            ];
        }

        return $body;
    }


}