objResponse; if (!count($arrData)) { $objResponse->addLogEntry('Magento: Got no data from transmitter', 'global', $this->getClassName(), 0, true); return; } $objResponse = $this->objResponse; $objResponse->addLogEntry('Magento: Start getting datalake...', 'global', $this->getClassName(), 0); $intStartTime = $objResponse->microtimeFloat(); $this->getData($arrData); $objResponse->addLogEntry('Magento: Getting datalake completed', 'global', $this->getClassName(), $objResponse->getDuration($intStartTime)); return $objResponse->getEntityMap(); } protected function getData($arrData) { $objResponse = $this->objResponse; try{ $connection = $this->objResourceConnection->getConnection(); $arrTables = $connection->fetchCol('show tables'); if (!in_array($this->objResourceConnection->getTableName('catalog_product_entity'), $arrTables) || !in_array($this->objResourceConnection->getTableName('catalog_product_entity_varchar'), $arrTables) || !in_array($this->objResourceConnection->getTableName('eav_attribute'), $arrTables) || !in_array($this->objResourceConnection->getTableName('eav_entity_type'), $arrTables) || !in_array($this->objResourceConnection->getTableName('catalog_product_website'), $arrTables) || !in_array($this->objResourceConnection->getTableName('store'), $arrTables)) { // Insert error message and return $objResponse->addLogEntry('Magento: Table not found', 'global', $this->getClassName(), 0, true); return; } $queryNameEntity = " SELECT entity_type_id FROM eav_entity_type " . " WHERE entity_type_code = 'catalog_product' "; $arrRowsPrice = $connection->fetchAll($queryNameEntity); $entityIdName = $arrRowsPrice[0]['entity_type_id']; $start_date = $arrData['params']['from']; $filter = ($start_date ? " AND cpe.updated_at > '" . $start_date . "' " : ''); $rowsFound = true; $offset = 0; while ($rowsFound) { $rowsFound = false; $query = " WITH RankedProducts AS ( " . " SELECT " . " cpe.entity_id, " . " cpe.sku, " . " cpe.updated_at, " . " cpev.value AS product_name, " . " st.store_id, " . " cpw.website_id, " . " st.name as website_name, " . " ROW_NUMBER() OVER (PARTITION BY cpw.website_id, cpe.entity_id ORDER BY st.store_id) AS rn " . " FROM catalog_product_entity AS cpe " . " JOIN catalog_product_entity_varchar AS cpev ON cpe.entity_id = cpev.entity_id " . " JOIN eav_attribute AS ea ON cpev.attribute_id = ea.attribute_id " . " JOIN catalog_product_website AS cpw ON cpe.entity_id = cpw.product_id " . " JOIN store AS st ON cpw.website_id = st.website_id " . " WHERE ea.attribute_code = 'name' AND ea.entity_type_id = " . $entityIdName . $filter . " ) " . " SELECT " . " entity_id, " . " sku, " . " updated_at, " . " product_name, " . " store_id, " . " website_id, " . " website_name " . " FROM " . " RankedProducts " . " WHERE rn = 1 " . " ORDER BY website_id, store_id, sku " . " LIMIT " . self::LIMIT . " OFFSET " . $offset; $arrRows = $connection->fetchAll($query); foreach($arrRows as $num => $row) { $rowsFound = true; $objResponse->addEntities([$this->getClassName() => ['entry'.($offset+$num) => $row ]], 'datalake', ['entry'.($offset+$num)]); } $offset += self::LIMIT; } } catch (\Exception $e) { $objResponse->addLogEntry('Magento: ' . $e->getMessage(), 'global', $this->getClassName(), 0, true); } } }