objResponse; if (!count($arrData)) { $objResponse->addLogEntry('Magento: Got no data from transmitter', 'global', $this->getClassName(), 0, true); return; } $objResponse->addLogEntry('Magento: Start getting datalake...', 'global', $this->getClassName(), 0); $intStartTime = $objResponse->microtimeFloat(); $this->getData($arrData); $objResponse->addLogEntry('Magento: Getting datalake completed', 'global', $this->getClassName(), $objResponse->getDuration($intStartTime)); return $objResponse->getEntityMap(); } protected function getData($arrData) { $objResponse = $this->objResponse; try{ $connection = $this->objResourceConnection->getConnection(); $arrTables = $connection->fetchCol('show tables'); if (!in_array($this->objResourceConnection->getTableName('catalog_product_entity'), $arrTables) || !in_array($this->objResourceConnection->getTableName('review'), $arrTables) || !in_array($this->objResourceConnection->getTableName('review_store'), $arrTables) || !in_array($this->objResourceConnection->getTableName('review_detail'), $arrTables) || !in_array($this->objResourceConnection->getTableName('catalog_product_website'), $arrTables) || !in_array($this->objResourceConnection->getTableName('rating_option_vote'), $arrTables) || !in_array($this->objResourceConnection->getTableName('store'), $arrTables)) { // Insert error message $objResponse->addLogEntry('Magento: Table not found', 'global', $this->getClassName(), 0, true); return; } $start_date = $arrData['params']['from']; $filter = ($start_date ? " AND r.created_at > '" . $start_date . "' " : ''); $rowsFound = true; $offset = 0; while ($rowsFound) { $rowsFound = false; $query = " WITH RankedProducts AS ( " . " SELECT " . " r.review_id, " . " r.created_at, " . " r.entity_id, " . " r.entity_pk_value AS product_id, " . " r.status_id, " . " rd.title, " . " rd.detail, " . " rd.nickname, " . " rd.customer_id, " . " s.store_id, " . " s.name AS website_name, " . " rv.value AS rating_value, " . " rv.entity_pk_value AS rated_product_id, " . " cpe.sku, " . " cpw.website_id, " . " ROW_NUMBER() OVER (PARTITION BY cpw.website_id, cpe.entity_id ORDER BY s.store_id) AS rn " . " FROM review AS r " . " JOIN review_detail AS rd ON r.review_id = rd.review_id " . " JOIN review_store AS rs ON r.review_id = rs.review_id " . " JOIN store AS s ON rs.store_id = s.store_id " . " JOIN catalog_product_entity AS cpe ON cpe.entity_id=r.entity_pk_value" . " JOIN catalog_product_website AS cpw ON cpe.entity_id = cpw.product_id " . " LEFT JOIN rating_option_vote AS rv ON r.review_id = rv.review_id " . $filter . " ) " . " SELECT " . " entity_id, " . " review_id, " . " sku, " . " created_at, " . " store_id, " . " website_id, " . " website_name, " . " title, " . " detail " . " FROM " . " RankedProducts " . " WHERE rn = 1 " . " ORDER BY website_id, store_id, sku " . " LIMIT " . self::LIMIT . " OFFSET " . $offset; $arrRows = $connection->fetchAll($query); foreach($arrRows as $num => $row) { $rowsFound = true; $objResponse->addEntities([$this->getClassName() => ['entry'.($offset+$num) => $row ]], 'datalake', ['entry'.($offset+$num)]); } $offset += self::LIMIT; } } catch (\Exception $e) { $objResponse->addLogEntry('Magento: ' . $e->getMessage(), 'global', $this->getClassName(), 0, true); } } }