objResponse; if (!count($arrData)) { $objResponse->addLogEntry('Magento: Got no data from transmitter', 'global', $this->getClassName(), 0, true); return; } $objResponse->addLogEntry('Magento: Start getting datalake...', 'global', $this->getClassName(), 0); $intStartTime = $objResponse->microtimeFloat(); $this->getData($arrData); $objResponse->addLogEntry('Magento: Getting datalake completed', 'global', $this->getClassName(), $objResponse->getDuration($intStartTime)); return $objResponse->getEntityMap(); } protected function getData($arrData) { $objResponse = $this->objResponse; try{ $connection = $this->objResourceConnection->getConnection(); $arrTables = $connection->fetchCol('show tables'); if (!in_array($this->objResourceConnection->getTableName('catalog_product_entity'), $arrTables) || !in_array($this->objResourceConnection->getTableName('eav_attribute'), $arrTables) || !in_array($this->objResourceConnection->getTableName('eav_entity_type'), $arrTables) || !in_array($this->objResourceConnection->getTableName('catalog_product_entity_varchar'), $arrTables) || !in_array($this->objResourceConnection->getTableName('catalog_product_entity_decimal'), $arrTables) || !in_array($this->objResourceConnection->getTableName('catalog_product_website'), $arrTables) || !in_array($this->objResourceConnection->getTableName('store'), $arrTables)) { // Insert error message $objResponse->addLogEntry('Magento: Table not found', 'global', $this->getClassName(), 0, true); return; } $queryPriceAttribute = " SELECT attribute_id FROM eav_attribute " . " WHERE attribute_code = 'price' " . " AND entity_type_id = (SELECT entity_type_id FROM eav_entity_type WHERE entity_type_code = 'catalog_product') "; $arrRowsPrice = $connection->fetchAll($queryPriceAttribute); $attrIdPrice = $arrRowsPrice[0]['attribute_id']; $queryNameEntity = " SELECT entity_type_id FROM eav_entity_type " . " WHERE entity_type_code = 'catalog_product' "; $arrRowsPrice = $connection->fetchAll($queryNameEntity); $entityIdName = $arrRowsPrice[0]['entity_type_id']; $filter = ($arrData['params']['from'] ? " AND cpe.updated_at > '" . $arrData['params']['from'] . "' " : ''); $rowsFound = true; $offset = 0; while ($rowsFound) { $rowsFound = false; $query = " WITH RankedProducts AS ( " . " SELECT " . " cpe.entity_id, " . " cpe.sku, " . " cpe.updated_at, " . " cpec.value AS product_name, " . " st.store_id, " . " cpw.website_id, " . " st.name as website_name, " . " cped.value AS price, " . " ( SELECT ccd.value FROM core_config_data AS ccd " . " WHERE ccd.path = 'currency/options/base' " . " AND (ccd.scope = 'default') LIMIT 1) AS currency, " . " ROW_NUMBER() OVER (PARTITION BY cpw.website_id, cpe.entity_id ORDER BY st.store_id) AS rn " . " FROM catalog_product_entity AS cpe " . " JOIN catalog_product_entity_varchar AS cpec ON cpe.entity_id = cpec.entity_id " . " JOIN eav_attribute AS ea ON cpec.attribute_id = ea.attribute_id " . " JOIN catalog_product_entity_decimal AS cped ON cpe.entity_id = cped.entity_id " . " JOIN catalog_product_website AS cpw ON cpe.entity_id = cpw.product_id " . " JOIN store AS st ON cpw.website_id = st.website_id " . " WHERE ea.attribute_code = 'name' AND ea.entity_type_id = " . $entityIdName . " AND cped.attribute_id = " . $attrIdPrice . $filter . " ) " . " SELECT " . " entity_id, " . " sku, " . " updated_at, " . " product_name, " . " store_id, " . " website_id, " . " website_name, " . " price, " . " currency " . " FROM " . " RankedProducts " . " WHERE rn = 1 " . " ORDER BY website_id, store_id, sku " . " LIMIT " . self::LIMIT . " OFFSET " . $offset; $arrRows = $connection->fetchAll($query); foreach($arrRows as $num => $row) { $rowsFound = true; $objResponse->addEntities([$this->getClassName() => ['entry'.($offset+$num) => $row ]], 'datalake', ['entry'.($offset+$num)]); } $offset += self::LIMIT; } } catch (\Exception $e) { $objResponse->addLogEntry('Magento: ' . $e->getMessage(), 'global', $this->getClassName(), 0, true); } } }