Commit 7d08d720 7d08d720fef206c2aab7c06701d76258dbc6c76e by lemon

*

1 parent e831ecaf
......@@ -65,16 +65,18 @@ class StakeholderIncomeSyncCommand extends Command
while (true) {
$redis = RedisClient::instance('bills');
$key = CacheKeyTools::billsSync();
$key = CacheKeyTools::billsSettleByNo();
//处理消息体
//处理消息体 - 每次处理1个入账的数据
if($msg = $redis->xread([$key=>$id], self::COUNT, self::BLOCK_TIME)) {
$id = key($msg[$key]);
$serial_no = $msg[$key][$id]['serial_no'];
$http_res = [];
$id = key($msg[$key]);
$task = $msg[$key][$id];
//$batch_no = $task['batch_no'];
$serial_no = $task['serial_no'];
$http_res = [];
Log::channel('api')->warning(__METHOD__."streamid:{$id}-即将处理任务", ['item'=>$msg[$key][$id]]);
Log::channel('api')->warning(__METHOD__."streamid:{$key}:{$id}-即将处理任务", ['task'=>$task]);
//获取记录
if ($http_data = $this->resolveSyncData($serial_no)) {
......@@ -86,12 +88,13 @@ class StakeholderIncomeSyncCommand extends Command
//处理返回体
$this->dealResponse($serial_no, $http_data, $http_res);
$this->consumeTask($http_res, $redis, $key, $id);
} else {
//此处需要加入 重试计数移除任务机制
Log::channel('api')->info(__METHOD__, ['msg'=>'暂时找不到需要同步的对应的收益记录', 'serial_no'=>$serial_no]);
Log::channel('api')->info(__METHOD__, ['msg'=>'未找到同步记录', 'serial_no'=>$serial_no]);
}
//标记处理
$this->consumeTask($redis, $http_res, $key, $id);
}
}
}
......@@ -115,7 +118,7 @@ class StakeholderIncomeSyncCommand extends Command
if (!empty($response)) {
StakeholderIncomeSyncApp::query()->where(['serial_no'=>$serial_no])->update([
'busi_id' => $response['id'],
'busi_id' => ($response['code'] == 0 ) ? $response['id'] : null,
'sync_status' => ($response['code'] == 0 ) ? 1 : 2,
]);
}
......@@ -130,16 +133,15 @@ class StakeholderIncomeSyncCommand extends Command
/**
* 将任务id标记为删除
* @param $http_res
* @param $redis
* @param $http_res
* @param $key
* @param $id
* @return mixed
*/
private function consumeTask($http_res, $redis, $key, $id)
private function consumeTask($redis, $http_res, $key, $id)
{
if (empty($http_res)) {
return $redis->xdel($key, [$id]);
$redis->xdel($key, [$id]);
} else {
switch (intval($http_res['code'])) {
case 0: //同步成功
......
......@@ -34,4 +34,11 @@ class CacheKeyTools
return str_replace('#serial_no#', $serial_no, config('cache.key')['bills_confirm']);
}
/**
* @return mixed
*/
public static function billsSettleByNo()
{
return config('cache.key')['bills_settle_no'];
}
}
......
......@@ -109,9 +109,10 @@ return [
//cache_key
'key'=> [
'bills_sync' => 'bills:sync',
'channelname' => 'channelname',
'bills_confirm' => 'serial:#serial_no#',
'bills_sync' => 'bills:sync',
'channelname' => 'channelname',
'bills_confirm' => 'serial:#serial_no#',
'bills_settle_no' => 'bills:settle:no'
]
];
......