数据同步
Showing
2 changed files
with
31 additions
and
8 deletions
... | @@ -6,6 +6,7 @@ use App\Helper\CacheKeyTools; | ... | @@ -6,6 +6,7 @@ use App\Helper\CacheKeyTools; |
6 | use App\Helper\RedisClient; | 6 | use App\Helper\RedisClient; |
7 | use App\Models\Legal\Bills; | 7 | use App\Models\Legal\Bills; |
8 | use App\Models\Legal\Company; | 8 | use App\Models\Legal\Company; |
9 | use App\Models\Legal\StakeholderIncomeByPayer; | ||
9 | use App\Services\ApiService; | 10 | use App\Services\ApiService; |
10 | use App\Traits\TaxReckon; | 11 | use App\Traits\TaxReckon; |
11 | use Illuminate\Bus\Queueable; | 12 | use Illuminate\Bus\Queueable; |
... | @@ -16,6 +17,10 @@ use Illuminate\Queue\InteractsWithQueue; | ... | @@ -16,6 +17,10 @@ use Illuminate\Queue\InteractsWithQueue; |
16 | use Illuminate\Queue\SerializesModels; | 17 | use Illuminate\Queue\SerializesModels; |
17 | use Illuminate\Support\Facades\Log; | 18 | use Illuminate\Support\Facades\Log; |
18 | 19 | ||
20 | /** | ||
21 | * Class StakeholderIncomeSyncJob | ||
22 | * @package App\Jobs | ||
23 | */ | ||
19 | class StakeholderIncomeSyncJob implements ShouldQueue | 24 | class StakeholderIncomeSyncJob implements ShouldQueue |
20 | { | 25 | { |
21 | use Dispatchable, InteractsWithQueue, Queueable, SerializesModels, TaxReckon; | 26 | use Dispatchable, InteractsWithQueue, Queueable, SerializesModels, TaxReckon; |
... | @@ -51,7 +56,7 @@ class StakeholderIncomeSyncJob implements ShouldQueue | ... | @@ -51,7 +56,7 @@ class StakeholderIncomeSyncJob implements ShouldQueue |
51 | $key = CacheKeyTools::billsSync(); | 56 | $key = CacheKeyTools::billsSync(); |
52 | 57 | ||
53 | //处理消息体 | 58 | //处理消息体 |
54 | if($msg = $redis->xread([$key=>$id], 1, 3)) { | 59 | if($msg = $redis->xread([$key=>$id], self::COUNT, self::BLOCK_TIME)) { |
55 | 60 | ||
56 | $id = key($msg[$key]); | 61 | $id = key($msg[$key]); |
57 | $income_item = $msg[$key][$id]; | 62 | $income_item = $msg[$key][$id]; |
... | @@ -59,9 +64,8 @@ class StakeholderIncomeSyncJob implements ShouldQueue | ... | @@ -59,9 +64,8 @@ class StakeholderIncomeSyncJob implements ShouldQueue |
59 | $company = Company::query()->find($income_item['company_id']); | 64 | $company = Company::query()->find($income_item['company_id']); |
60 | $bills = Bills::query()->find($income_item['related_id']); | 65 | $bills = Bills::query()->find($income_item['related_id']); |
61 | $channel = 'TME'; | 66 | $channel = 'TME'; |
62 | $title = "{$bills->bill_section_start}-{$bills->bill_section_end}账单/{$channel}"; | ||
63 | 67 | ||
64 | $http_res = ApiService::walletAddIncome([ | 68 | $http_data = [ |
65 | 'cardNo'=>$income_item['identifier'], | 69 | 'cardNo'=>$income_item['identifier'], |
66 | 'faxMoney'=>$income_item['fax_money'], | 70 | 'faxMoney'=>$income_item['fax_money'], |
67 | 'money'=>$income_item['money'], | 71 | 'money'=>$income_item['money'], |
... | @@ -69,18 +73,31 @@ class StakeholderIncomeSyncJob implements ShouldQueue | ... | @@ -69,18 +73,31 @@ class StakeholderIncomeSyncJob implements ShouldQueue |
69 | 'paymentCompany'=>$company->receipt_name, | 73 | 'paymentCompany'=>$company->receipt_name, |
70 | 'type'=>self::TYPE_BILLS, | 74 | 'type'=>self::TYPE_BILLS, |
71 | 'title'=>"{$bills->bill_section_start}-{$bills->bill_section_end}账单/{$channel}", | 75 | 'title'=>"{$bills->bill_section_start}-{$bills->bill_section_end}账单/{$channel}", |
72 | ]); | 76 | ]; |
77 | |||
78 | $http_res = ApiService::walletAddIncome($http_data); | ||
73 | 79 | ||
74 | if (empty($http_res)) { | 80 | if (empty($http_res)) { |
75 | //重试 401/403/403 | 81 | //重试 401/403/403 |
76 | Log::info(__METHOD__.'请求失败'); | 82 | Log::channel('api')->warning(__METHOD__."streamid:{$id}-api请求失败", ['income_item'=>$income_item, 'http_data'=>$http_data]); |
77 | } else { | 83 | } else { |
78 | switch ($http_res['code']) { | ||
79 | 84 | ||
80 | } | 85 | $update = [ |
81 | } | 86 | 'http_log'=>json_encode($http_res, JSON_UNESCAPED_UNICODE), |
87 | 'busi_id'=>$http_res['data']['busiId'], | ||
88 | ]; | ||
82 | 89 | ||
90 | if ($http_res['code'] == 0) { | ||
91 | $update['sync_status'] = 1; | ||
92 | } else { | ||
93 | $update['sync_status'] = 2; | ||
94 | } | ||
83 | 95 | ||
96 | //记录请求api返回体并修改状态 | ||
97 | if (StakeholderIncomeByPayer::query()->where(['serial_no'=>$income_item['serial_no']])->update($update)) { | ||
98 | $redis->xdel($key, [$id]); | ||
99 | } | ||
100 | } | ||
84 | } | 101 | } |
85 | 102 | ||
86 | } | 103 | } | ... | ... |
... | @@ -108,6 +108,12 @@ return [ | ... | @@ -108,6 +108,12 @@ return [ |
108 | 'level' => env('LOG_LEVEL', 'debug'), | 108 | 'level' => env('LOG_LEVEL', 'debug'), |
109 | 'days' => 14, | 109 | 'days' => 14, |
110 | ], | 110 | ], |
111 | |||
112 | 'api' => [ | ||
113 | 'driver' => 'daily', | ||
114 | 'path' => storage_path('logs/laravel-api.log'), | ||
115 | 'level' => env('LOG_LEVEL', 'debug'), | ||
116 | ] | ||
111 | ], | 117 | ], |
112 | 118 | ||
113 | ]; | 119 | ]; | ... | ... |
-
Please register or sign in to post a comment