StakeholderIncomeSyncCommand.php 5.84 KB
<?php

namespace App\Console\Commands;

use App\Helper\CacheKeyTools;
use App\Helper\ErrorCode;
use App\Helper\RedisClient;
use App\Helper\Response;
use App\Models\Legal\Bills;
use App\Models\Legal\Company;
use App\Models\Legal\StakeholderIncomeSyncApp;
use App\Models\Legal\StakeholderIncomeSyncAppDetails;
use App\Models\Legal\StakeholderIncomeSyncAppLogs;
use App\Services\ApiService;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;

/**
 * 合作伙同数据同步
 * Class StakeholderIncomeSyncCommand
 * @package App\Console\Commands
 */
class StakeholderIncomeSyncCommand extends Command
{
    use \App\Traits\Bills;

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'stakeholder:income:sync';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = '权益人收益同步';

    //常量定义
    const COUNT = 1;
    const BLOCK_TIME = 1000;

    const TYPE_BILLS = "1001601"; //账单分成

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return int
     */
    public function handle()
    {
        $id = '0';

        while (true) {

            $redis = RedisClient::instance('bills');
            $key   = CacheKeyTools::billsSync();

            //处理消息体
            if($msg = $redis->xread([$key=>$id], self::COUNT, self::BLOCK_TIME)) {

                $id           =   key($msg[$key]);
                $serial_no    =   $msg[$key][$id]['serial_no'];

                Log::channel('api')->warning(__METHOD__."streamid:{$id}-即将处理任务", ['item'=>$msg[$key][$id]]);

                //获取记录
                if ($http_data = $this->resolveSyncData($serial_no)) {
                    //http
                    $http_res = ApiService::walletAddIncome($http_data);

                    Log::channel('api')->info(__METHOD__."同步反馈", ['stream-id'=>$id, 'serial_no'=>$serial_no, 'request'=>$http_data, 'response'=>$http_res]);

                    //处理返回体
                    $this->dealResponse($serial_no, $http_data, $http_res);

                } else {
                    Log::channel('api')->info(__METHOD__, ['msg'=>'找不到需要同步的对应的收益记录', 'serial_no'=>$serial_no]);
                }

                $this->consumeTask($redis, $key, $id);
            }
        }
    }

    /**
     * 处理返回体
     * @param $serial_no
     * @param $http_res
     */
    private function dealResponse($serial_no, $request, $response)
    {
        DB::beginTransaction();

        try {

            StakeholderIncomeSyncAppLogs::query()->create([
                'serial_no'     =>  $serial_no,
                'request_log'   =>  $request,
                'response_log'  =>  $response
            ]);

            if (!empty($response) && ($response['code'] == 0)) {
                StakeholderIncomeSyncApp::query()->where(['serial_no'=>$serial_no])->update([
                    'busi_id'     =>    $response['id'],
                    'sync_status' =>    1,
                ]);
            }

            DB::commit();

        } catch (\Exception $e) {
            DB::rollBack();
            Log::channel('api')->error(__METHOD__, ['msg'=>$e->getMessage(), 'request'=>$request, 'response'=>$response]);
        }


    }

    /**
     * 消费任务
     * @param $redis
     * @param $key
     * @param $id
     * @return mixed
     */
    private function consumeTask($redis, $key, $id)
    {
        return $redis->xdel($key, [$id]);
    }

    /**
     * 生成http请求数据
     * @param $serial_no
     * @return array
     */
    private function resolveSyncData($serial_no)
    {
        if ($income      = StakeholderIncomeSyncApp::query()->where(['serial_no'=>$serial_no, 'sync_status'=>0])->first()) {
            $income_detail  =   StakeholderIncomeSyncAppDetails::query()->where(['track_serial_no'=>$serial_no])->orderBy('month')->get();

            if ($income_detail->isNotEmpty()) {
                return $this->formatHttpBody($income, $income_detail);
            }
        }

        return [];
    }

    /**
     * @param $income
     * @param $income_detail
     * @return array
     */
    private function formatHttpBody($income, $income_detail)
    {

        $company = Company::query()->find($income->company_id);
        $bills   = Bills::query()->find($income->related_id);

        $body = [
            'busiId'        =>  $income->serial_no,
            'cardNo'        =>  $income->identifier,
            'cost'          =>  $income->cost_amount,
            'deductAmount'  =>  $income->deduct_prepaid,
            'faxMoney'      =>  $income->fax_money,
            'money'         =>  $income->money,
            'totalMoney'    =>  $income->total_money,
            'type'          =>  self::TYPE_BILLS,
            'title'         =>  $this->billsTitle($bills),//账单标题
            'paymentCompany'=>  $company->receipt_name,  //付款公司营业执照全称
            'addLevelTwoIncomeRequests' => [],
        ];

        foreach ($income_detail as $item) {
            $body['addLevelTwoIncomeRequests'][] = [
                'actualTime' => $item->created_at,
                'busiId'     => $item->serial_no,
                'faxMoney'   => $item->fax_money,
                'money'      => $item->money,
                'totalMoney' => $item->total_money,
                'paymentCompany' => $company->receipt_name,  //付款公司营业执照全称
                'songNum'    => $item->song_num,
                'title'      => $this->billsSubTitle($item->month),
                'type'       => self::TYPE_BILLS,
            ];
        }

        return $body;
    }


}