invoker = $invoker; $this->resource = $resource; $this->messageController = $messageController; $this->configuration = $configuration; $this->importApi = $importApi; $this->publisher = $publisher; //$this->logger = $logger; $locale->loadData(); Phrase::setRenderer($renderer); } /** * {@inheritdoc} */ public function process($maxNumberOfMessages = null) { $queue = $this->configuration->getQueue(); if (!isset($maxNumberOfMessages)) { $queue->subscribe($this->getTransactionCallback($queue)); } else { $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue)); } } /** * Get transaction callback. This handles the case of async. * * @param QueueInterface $queue * @return \Closure */ private function getTransactionCallback(QueueInterface $queue) { return function (EnvelopeInterface $message) use ($queue) { /** @var LockInterface $lock */ $lock = null; try { $ida = rand(); $lock = $this->messageController->lock($message, $this->configuration->getConsumerName()); $arrEntityMap = $this->importApi->executeQueueData($message->getBody()); if ($arrEntityMap === false) { $queue->reject($message); // if get error in message process } $queue->acknowledge($message); // send acknowledge to queue try { $this->publisher->publish($arrEntityMap); } catch (\Exception $e) { file_put_contents('/tmp/test.log', "EX:" . print_r($e->getMessage(), 1) . "\n", FILE_APPEND); } } catch (MessageLockException $exception) { $queue->acknowledge($message); } catch (ConnectionLostException $e) { $queue->acknowledge($message); if ($lock) { $this->resource->getConnection() ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]); } } catch (NotFoundException $e) { $queue->acknowledge($message); //$this->logger->warning($e->getMessage()); } catch (\Exception $e) { $queue->reject($message, false, $e->getMessage()); $queue->acknowledge($message); if ($lock) { $this->resource->getConnection() ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]); } } }; } }