diff --git a/app/controllers/InitialDataController.scala b/app/controllers/InitialDataController.scala index 1cd2ac3af17..2b109b6c189 100644 --- a/app/controllers/InitialDataController.scala +++ b/app/controllers/InitialDataController.scala @@ -143,7 +143,11 @@ Samplecountry "This is a wonderful dummy publication, it has authors, it has a link, it has a doi number, those could go here.\nLorem [ipsum](https://github.com/scalableminds/webknossos) dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua.") ) private val defaultDataStore = - DataStore(conf.Datastore.name, conf.Http.uri, conf.Datastore.publicUri.getOrElse(conf.Http.uri), conf.Datastore.key) + DataStore(conf.Datastore.name, + conf.Http.uri, + conf.Datastore.publicUri.getOrElse(conf.Http.uri), + conf.Datastore.key, + reportUsedStorageEnabled = true) // TODOM: Undo this private val defaultAiModel = AiModel( ObjectId("66544a56d20000af0e42ba0f"), defaultOrganization._id, diff --git a/app/models/dataset/Dataset.scala b/app/models/dataset/Dataset.scala index 2b995701c2c..98b2b5fc94d 100755 --- a/app/models/dataset/Dataset.scala +++ b/app/models/dataset/Dataset.scala @@ -732,7 +732,8 @@ case class MagWithPaths(layerName: String, realPath: Option[String], hasLocalData: Boolean) -case class DataSourceMagRow(_dataset: ObjectId, +case class DataSourceMagRow(_id: ObjectId, + _dataset: ObjectId, dataLayerName: String, mag: String, path: Option[String], @@ -766,22 +767,40 @@ class DatasetMagsDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionConte mags <- Fox.combined(rows.map(r => parseMag(r.mag))) ?~> "could not parse mag row" } yield mags + // TODO: Mention in docs & migration guide that postgres now needs to be at least 16+. + def findAllStorageRelevantMags(organizationId: String, + dataStoreId: String, + datasetIdOpt: Option[ObjectId]): Fox[List[DataSourceMagRow]] = + for { + storageRelevantMags <- run(q"""SELECT * + FROM ( + SELECT mag._id, ds._id AS dataset_id, mag.dataLayerName, mag.mag, mag.path, mag.realPath, mag.hasLocalData, + ds._organization, ds.directoryName, ROW_NUMBER() OVER (PARTITION BY mag.path ORDER BY ds.created ASC) AS rn + FROM webknossos.dataset_mags AS mag + JOIN webknossos.datasets AS ds ON mag._dataset = ds._id + WHERE ds._organization = $organizationId + AND ds._dataStore = $dataStoreId + ${datasetIdOpt.map(datasetId => q"AND ds._id = $datasetId").getOrElse(q"")} + ) AS ranked + WHERE rn = 1;""".as[DataSourceMagRow]) + } yield storageRelevantMags.toList + def updateMags(datasetId: ObjectId, dataLayersOpt: Option[List[DataLayer]]): Fox[Unit] = { val clearQuery = q"DELETE FROM webknossos.dataset_mags WHERE _dataset = $datasetId".asUpdate val insertQueries = dataLayersOpt.getOrElse(List.empty).flatMap { layer: DataLayer => layer.magsOpt match { case Some(mags) => mags.map(mag => { - q"""INSERT INTO webknossos.dataset_mags(_dataset, dataLayerName, mag, path, axisOrder, channelIndex, credentialId) - VALUES($datasetId, ${layer.name}, ${mag.mag}, ${mag.path}, ${mag.axisOrder - .map(Json.toJson(_))}, ${mag.channelIndex}, ${mag.credentialId}) + q"""INSERT INTO webknossos.dataset_mags(_id, _dataset, dataLayerName, mag, path, axisOrder, channelIndex, credentialId) + VALUES(${ObjectId.generate}, $datasetId, ${layer.name}, ${mag.mag}, ${mag.path}, ${mag.axisOrder.map( + Json.toJson(_))}, ${mag.channelIndex}, ${mag.credentialId}) """.asUpdate }) case None => layer.resolutions.distinct.map { mag: Vec3Int => { - q"""INSERT INTO webknossos.dataset_mags(_dataset, dataLayerName, mag) - VALUES($datasetId, ${layer.name}, $mag)""".asUpdate + q"""INSERT INTO webknossos.dataset_mags(_id, _dataset, dataLayerName, mag) + VALUES(${ObjectId.generate}, $datasetId, ${layer.name}, $mag)""".asUpdate } } } @@ -812,14 +831,17 @@ class DatasetMagsDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionConte implicit def GetResultDataSourceMagRow: GetResult[DataSourceMagRow] = GetResult( r => - DataSourceMagRow(ObjectId(r.nextString()), - r.nextString(), - r.nextString(), - r.nextStringOption(), - r.nextStringOption(), - r.nextBoolean(), - r.nextString(), - r.nextString())) + DataSourceMagRow( + ObjectId(r.nextString()), + ObjectId(r.nextString()), + r.nextString(), + r.nextString(), + r.nextStringOption(), + r.nextStringOption(), + r.nextBoolean(), + r.nextString(), + r.nextString() + )) private def rowsToMagInfos(rows: Vector[DataSourceMagRow]): Fox[List[DataSourceMagInfo]] = for { @@ -833,7 +855,8 @@ class DatasetMagsDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionConte def findPathsForDatasetAndDatalayer(datasetId: ObjectId, dataLayerName: String): Fox[List[DataSourceMagInfo]] = for { - rows <- run(q"""SELECT _dataset, dataLayerName, mag, path, realPath, hasLocalData, _organization, directoryName + rows <- run( + q"""SELECT _id, _dataset, dataLayerName, mag, path, realPath, hasLocalData, _organization, directoryName FROM webknossos.dataset_mags INNER JOIN webknossos.datasets ON webknossos.dataset_mags._dataset = webknossos.datasets._id WHERE _dataset = $datasetId @@ -843,7 +866,8 @@ class DatasetMagsDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionConte def findAllByRealPath(realPath: String): Fox[List[DataSourceMagInfo]] = for { - rows <- run(q"""SELECT _dataset, dataLayerName, mag, path, realPath, hasLocalData, _organization, directoryName + rows <- run( + q"""SELECT _id, _dataset, dataLayerName, mag, path, realPath, hasLocalData, _organization, directoryName FROM webknossos.dataset_mags INNER JOIN webknossos.datasets ON webknossos.dataset_mags._dataset = webknossos.datasets._id WHERE realPath = $realPath""".as[DataSourceMagRow]) @@ -870,7 +894,7 @@ class DatasetMagsDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionConte def findAllByDatasetId(datasetId: ObjectId): Fox[Seq[(String, MagLocator)]] = for { rows <- run( - q"""SELECT _dataset, dataLayerName, mag, path, realPath, hasLocalData, axisOrder, channelIndex, credentialId + q"""SELECT _id, _dataset, dataLayerName, mag, path, realPath, hasLocalData, axisOrder, channelIndex, credentialId FROM webknossos.dataset_mags WHERE _dataset = $datasetId""".as[DatasetMagsRow]) mags <- Fox.combined(rows.map(parseMagLocator)) } yield rows.map(r => r.datalayername).zip(mags) @@ -1086,7 +1110,7 @@ class DatasetLayerAttachmentsDAO @Inject()(sqlClient: SqlClient)(implicit ec: Ex def findAllForDatasetAndDataLayerName(datasetId: ObjectId, layerName: String): Fox[AttachmentWrapper] = for { - rows <- run(q"""SELECT _dataset, layerName, name, path, type, dataFormat + rows <- run(q"""SELECT _id, _dataset, layerName, name, path, type, dataFormat FROM webknossos.dataset_layer_attachments WHERE _dataset = $datasetId AND layerName = $layerName""".as[DatasetLayerAttachmentsRow]) attachments <- parseAttachments(rows.toList) ?~> "Could not parse attachments" @@ -1094,8 +1118,8 @@ class DatasetLayerAttachmentsDAO @Inject()(sqlClient: SqlClient)(implicit ec: Ex def updateAttachments(datasetId: ObjectId, dataLayersOpt: Option[List[DataLayer]]): Fox[Unit] = { def insertQuery(attachment: LayerAttachment, layerName: String, fileType: String) = - q"""INSERT INTO webknossos.dataset_layer_attachments(_dataset, layerName, name, path, type, dataFormat) - VALUES($datasetId, $layerName, ${attachment.name}, ${attachment.path.toString}, $fileType::webknossos.LAYER_ATTACHMENT_TYPE, + q"""INSERT INTO webknossos.dataset_layer_attachments(_id, _dataset, layerName, name, path, type, dataFormat) + VALUES(${ObjectId.generate}, $datasetId, $layerName, ${attachment.name}, ${attachment.path.toString}, $fileType::webknossos.LAYER_ATTACHMENT_TYPE, ${attachment.dataFormat}::webknossos.LAYER_ATTACHMENT_DATAFORMAT)""".asUpdate val clearQuery = q"DELETE FROM webknossos.dataset_layer_attachments WHERE _dataset = $datasetId".asUpdate @@ -1119,6 +1143,26 @@ class DatasetLayerAttachmentsDAO @Inject()(sqlClient: SqlClient)(implicit ec: Ex } replaceSequentiallyAsTransaction(clearQuery, insertQueries) } + + def findAllStorageRelevantAttachments(organizationId: String, + dataStoreId: String, + datasetIdOpt: Option[ObjectId]): Fox[List[DatasetLayerAttachmentsRow]] = + for { + storageRelevantAttachments <- run(q"""SELECT * + FROM ( + SELECT + att.*, + ROW_NUMBER() OVER (PARTITION BY att.path ORDER BY ds.created ASC) AS rn + FROM webknossos.dataset_layer_attachments AS att + JOIN webknossos.datasets AS ds ON att._dataset = ds._id + WHERE ds._organization = $organizationId + AND ds._dataStore = $dataStoreId + ${datasetIdOpt + .map(datasetId => q"AND ds._id = $datasetId") + .getOrElse(q"")} + ) AS ranked + WHERE rn = 1;""".as[DatasetLayerAttachmentsRow]) + } yield storageRelevantAttachments.toList } class DatasetCoordinateTransformationsDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) diff --git a/app/models/dataset/WKRemoteDataStoreClient.scala b/app/models/dataset/WKRemoteDataStoreClient.scala index a5c2a1037de..47befa28f16 100644 --- a/app/models/dataset/WKRemoteDataStoreClient.scala +++ b/app/models/dataset/WKRemoteDataStoreClient.scala @@ -11,7 +11,7 @@ import com.scalableminds.webknossos.datastore.explore.{ } import com.scalableminds.webknossos.datastore.models.{AdditionalCoordinate, RawCuboidRequest} import com.scalableminds.webknossos.datastore.rpc.RPC -import com.scalableminds.webknossos.datastore.services.DirectoryStorageReport +import com.scalableminds.webknossos.datastore.services.{PathStorageUsageRequest, PathStorageUsageResponse} import com.typesafe.scalalogging.LazyLogging import controllers.RpcTokenHolder import play.api.libs.json.JsObject @@ -71,12 +71,10 @@ class WKRemoteDataStoreClient(dataStore: DataStore, rpc: RPC) extends LazyLoggin private def urlEncode(text: String) = UriEncoding.encodePathSegment(text, "UTF-8") - def fetchStorageReport(organizationId: String, datasetName: Option[String]): Fox[List[DirectoryStorageReport]] = + def fetchStorageReports(organizationId: String, paths: List[String]): Fox[PathStorageUsageResponse] = rpc(s"${dataStore.url}/data/datasets/measureUsedStorage/${urlEncode(organizationId)}") - .addQueryString("token" -> RpcTokenHolder.webknossosToken) - .addQueryStringOptional("datasetName", datasetName) - .silent - .getWithJsonResponse[List[DirectoryStorageReport]] + .addQueryString("token" -> RpcTokenHolder.webknossosToken) // TODOM: Maybe make silent again + .postJsonWithJsonResponse[PathStorageUsageRequest, PathStorageUsageResponse](PathStorageUsageRequest(paths)) def hasSegmentIndexFile(datasetId: ObjectId, layerName: String)(implicit ec: ExecutionContext): Fox[Boolean] = { val cacheKey = (datasetId, layerName) diff --git a/app/models/organization/Organization.scala b/app/models/organization/Organization.scala index a620665ab57..50a94bc56de 100644 --- a/app/models/organization/Organization.scala +++ b/app/models/organization/Organization.scala @@ -3,7 +3,6 @@ package models.organization import com.scalableminds.util.accesscontext.DBAccessContext import com.scalableminds.util.time.Instant import com.scalableminds.util.tools.Fox -import com.scalableminds.webknossos.datastore.services.DirectoryStorageReport import com.scalableminds.webknossos.schema.Tables._ import models.team.PricingPlan import models.team.PricingPlan.PricingPlan @@ -33,6 +32,15 @@ case class Organization( isDeleted: Boolean = false ) +case class ArtifactStorageReport( + _organizationId: String, + _datasetId: ObjectId, + // Left for mags, right for attachments + _artifactId: Either[ObjectId, ObjectId], + path: String, + usedStorageBytes: Long +) + class OrganizationDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) extends SQLDAO[Organization, OrganizationsRow, Organizations](sqlClient) { protected val collection = Organizations @@ -170,30 +178,41 @@ class OrganizationDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionCont _ <- run(q"UPDATE webknossos.organizations SET lastStorageScanTime = $time WHERE _id = $organizationId".asUpdate) } yield () - def upsertUsedStorage(organizationId: String, - dataStoreName: String, - usedStorageEntries: List[DirectoryStorageReport]): Fox[Unit] = { - val queries = usedStorageEntries.map(entry => q""" - WITH ds AS ( - SELECT _id - FROM webknossos.datasets_ - WHERE _organization = $organizationId - AND name = ${entry.datasetName} - LIMIT 1 - ) - INSERT INTO webknossos.organization_usedStorage( - _organization, _dataStore, _dataset, layerName, - magOrDirectoryName, usedStorageBytes, lastUpdated) - SELECT - $organizationId, $dataStoreName, ds._id, ${entry.layerName}, - ${entry.magOrDirectoryName}, ${entry.usedStorageBytes}, NOW() - FROM ds - ON CONFLICT (_organization, _dataStore, _dataset, layerName, magOrDirectoryName) - DO UPDATE - SET usedStorageBytes = ${entry.usedStorageBytes}, lastUpdated = NOW() - """.asUpdate) - for { - _ <- Fox.serialCombined(queries)(q => run(q)) + def upsertUsedStorage( + organizationId: String, + usedStorageEntries: List[ArtifactStorageReport] + ): Fox[Unit] = { + val reportUpsetQueries = usedStorageEntries.map { r => + r._artifactId match { + case Left(magId) => + q""" + INSERT INTO webknossos.organization_usedStorage ( + _organization, _dataset, _dataset_mag, _layer_attachment, path, usedStorageBytes, lastUpdated + ) + VALUES (${organizationId}, ${r._datasetId}, ${magId}, NULL, ${r.path}, ${r.usedStorageBytes}, NOW()) + ON CONFLICT ON CONSTRAINT unique_dataset_mag + DO UPDATE SET + path = EXCLUDED.path, + usedStorageBytes = EXCLUDED.usedStorageBytes, + lastUpdated = EXCLUDED.lastUpdated; + """.asUpdate + case Right(attachmentId) => + q""" + INSERT INTO webknossos.organization_usedStorage ( + _organization, _dataset, _dataset_mag, _layer_attachment, path, usedStorageBytes, lastUpdated + ) -- TODO: test why no s3 test dataset is included + VALUES (${organizationId}, ${r._datasetId}, NULL, ${attachmentId}, ${r.path}, ${r.usedStorageBytes}, NOW()) + ON CONFLICT ON CONSTRAINT unique_layer_attachment + DO UPDATE SET + path = EXCLUDED.path, + usedStorageBytes = EXCLUDED.usedStorageBytes, + lastUpdated = EXCLUDED.lastUpdated; + """.asUpdate + } + } + + for { + _ <- Fox.serialCombined(reportUpsetQueries)(q => run(q)) } yield () } diff --git a/app/models/storage/UsedStorageService.scala b/app/models/storage/UsedStorageService.scala index f9fd68d8d94..b4e857c8c87 100644 --- a/app/models/storage/UsedStorageService.scala +++ b/app/models/storage/UsedStorageService.scala @@ -2,18 +2,32 @@ package models.storage import org.apache.pekko.actor.ActorSystem import com.scalableminds.util.accesscontext.{DBAccessContext, GlobalAccessContext} +import com.scalableminds.util.geometry.Vec3Int +import com.scalableminds.util.objectid.ObjectId import com.scalableminds.util.time.Instant import com.scalableminds.util.tools.Fox import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler import com.scalableminds.webknossos.datastore.rpc.RPC -import com.scalableminds.webknossos.datastore.services.DirectoryStorageReport +import com.scalableminds.webknossos.datastore.services.PathStorageReport import com.typesafe.scalalogging.LazyLogging -import models.dataset.{Dataset, DatasetService, DataStore, DataStoreDAO, WKRemoteDataStoreClient} -import models.organization.{Organization, OrganizationDAO} +import models.dataset.{ + DataSourceMagRow, + DataStore, + DataStoreDAO, + Dataset, + DatasetLayerAttachmentsDAO, + DatasetMagsDAO, + DatasetService, + WKRemoteDataStoreClient +} +import models.organization.{ArtifactStorageReport, Organization, OrganizationDAO} import com.scalableminds.util.tools.{Failure, Full} +import com.scalableminds.webknossos.schema.Tables.DatasetLayerAttachmentsRow import play.api.inject.ApplicationLifecycle import utils.WkConf +import utils.sql.SqlEscaping +import java.nio.file.Paths import javax.inject.Inject import scala.concurrent.ExecutionContext import scala.concurrent.duration._ @@ -23,10 +37,13 @@ class UsedStorageService @Inject()(val actorSystem: ActorSystem, organizationDAO: OrganizationDAO, datasetService: DatasetService, dataStoreDAO: DataStoreDAO, + datasetMagDAO: DatasetMagsDAO, + datasetLayerAttachmentsDAO: DatasetLayerAttachmentsDAO, rpc: RPC, config: WkConf)(implicit val ec: ExecutionContext) extends LazyLogging - with IntervalScheduler { + with IntervalScheduler + with SqlEscaping { /* Note that not every tick here will scan something, there is additional logic below: Every tick, et most scansPerTick organizations are scanned. @@ -38,6 +55,7 @@ class UsedStorageService @Inject()(val actorSystem: ActorSystem, private val pauseAfterEachOrganization = 5 seconds private val organizationCountToScanPerTick = config.WebKnossos.FetchUsedStorage.scansPerTick + private val MAX_STORAGE_PATH_REQUESTS_PER_REQUEST = 200 implicit private val ctx: DBAccessContext = GlobalAccessContext @@ -65,37 +83,132 @@ class UsedStorageService @Inject()(val actorSystem: ActorSystem, private def refreshStorageReports(organization: Organization, dataStores: List[DataStore]): Fox[Unit] = for { storageReportsByDataStore <- Fox.serialCombined(dataStores)(dataStore => - refreshStorageReports(dataStore, organization)) ?~> "Failed to fetch used storage reports" + getNewestStorageReports(dataStore, organization)) ?~> "Failed to fetch used storage reports" _ <- organizationDAO.deleteUsedStorage(organization._id) ?~> "Failed to delete outdated used storage entries" - _ <- Fox.serialCombined(storageReportsByDataStore.zip(dataStores))(storageForDatastore => - upsertUsedStorage(organization, storageForDatastore)) ?~> "Failed to upsert used storage reports into db" + allStorageReports = storageReportsByDataStore.flatten + _ <- Fox.runIfNonEmpty(allStorageReports)(organizationDAO.upsertUsedStorage(organization._id, allStorageReports)) ?~> "Failed to upsert used storage reports into db" _ <- organizationDAO.updateLastStorageScanTime(organization._id, Instant.now) ?~> "Failed to update last storage scan time in db" _ = Thread.sleep(pauseAfterEachOrganization.toMillis) } yield () - private def refreshStorageReports(dataStore: DataStore, - organization: Organization): Fox[List[DirectoryStorageReport]] = { - val dataStoreClient = new WKRemoteDataStoreClient(dataStore, rpc) - dataStoreClient.fetchStorageReport(organization._id, datasetName = None) + private def getNewestStorageReports(dataStore: DataStore, + organization: Organization, + datasetIdOpt: Option[ObjectId] = None): Fox[List[ArtifactStorageReport]] = + for { + relevantMagsForStorageReporting <- datasetMagDAO.findAllStorageRelevantMags(organization._id, + dataStore.name, + datasetIdOpt) + relevantPathsAndUnparsableMags = relevantMagsForStorageReporting.map(resolvePath) + unparsableMags = relevantPathsAndUnparsableMags.collect { case Right(mag) => mag }.distinctBy(_._dataset) + relevantMagsWithValidPaths = relevantPathsAndUnparsableMags.collect { case Left(magWithPaths) => magWithPaths } + relevantMagPaths = relevantPathsAndUnparsableMags.collect { case Left((_, paths)) => paths }.flatten + _ = Fox.runIfNonEmpty(unparsableMags)(logger.error( + s"Found dataset mags with unparsable mag literals in datastore ${dataStore.name} of organization ${organization._id} with dataset ids : ${unparsableMags + .map(_._dataset)}")) + relevantAttachments <- datasetLayerAttachmentsDAO.findAllStorageRelevantAttachments(organization._id, + dataStore.name, + datasetIdOpt) + pathToArtifactLookupMap = buildPathToStorageArtifactMap(relevantMagsWithValidPaths, relevantAttachments) + relevantAttachmentPaths = relevantAttachments.map(_.path) + relevantPaths = relevantMagPaths ++ relevantAttachmentPaths + reports <- fetchAllStorageReportsForPaths(organization._id, relevantPaths, dataStore) + storageReports = buildStorageReportsForPathReports(organization._id, reports, pathToArtifactLookupMap) + } yield storageReports + + private def resolvePath(mag: DataSourceMagRow): Either[(DataSourceMagRow, List[String]), DataSourceMagRow] = + mag.realPath match { + case Some(realPath) => Left((mag, List(realPath))) + case None => + mag.path match { + case Some(path) => Left((mag, List(path))) + case None => + val layerPath = Paths.get(mag.directoryName).resolve(mag.dataLayerName) + val parsedMagOpt = Vec3Int.fromList(parseArrayLiteral(mag.mag).map(_.toInt)) + + parsedMagOpt match { + case Some(parsedMag) => + Left( + ( + mag, + List( + layerPath.resolve(parsedMag.toMagLiteral(allowScalar = true)).toString, + layerPath.resolve(parsedMag.toMagLiteral(allowScalar = false)).toString + ) + ) + ) + case None => + Right(mag) + } + } + } + + private def buildPathToStorageArtifactMap( + magsWithValidPaths: List[(DataSourceMagRow, List[String])], + relevantAttachments: List[DatasetLayerAttachmentsRow] + ): Map[String, Either[DataSourceMagRow, DatasetLayerAttachmentsRow]] = { + + val magEntries: List[(String, Either[DataSourceMagRow, DatasetLayerAttachmentsRow])] = + magsWithValidPaths.flatMap { + case (mag, paths) => + paths.map(path => path -> Left(mag)) + } + + val attachmentEntries: List[(String, Either[DataSourceMagRow, DatasetLayerAttachmentsRow])] = + relevantAttachments.map(att => att.path -> Right(att)) + + (magEntries ++ attachmentEntries).toMap } - private def upsertUsedStorage(organization: Organization, - storageReportsForDatastore: (List[DirectoryStorageReport], DataStore)): Fox[Unit] = { - val dataStore = storageReportsForDatastore._2 - val storageReports = storageReportsForDatastore._1 - organizationDAO.upsertUsedStorage(organization._id, dataStore.name, storageReports) + private def fetchAllStorageReportsForPaths(organizationId: String, + relevantPaths: List[String], + dataStore: DataStore): Fox[List[PathStorageReport]] = { + val dataStoreClient = new WKRemoteDataStoreClient(dataStore, rpc) + for { + storageReportAnswers <- Fox.serialCombined(relevantPaths.grouped(MAX_STORAGE_PATH_REQUESTS_PER_REQUEST).toList)( + pathsBatch => + dataStoreClient.fetchStorageReports(organizationId, pathsBatch) ?~> "Could not fetch storage report") + storageReports = storageReportAnswers.flatMap(_.reports) + } yield storageReports } + private def buildStorageReportsForPathReports( + organizationId: String, + pathReports: List[PathStorageReport], + pathToArtifactMap: Map[String, Either[DataSourceMagRow, DatasetLayerAttachmentsRow]]) + : List[ArtifactStorageReport] = + pathReports.flatMap(pathReport => { + pathToArtifactMap(pathReport.path) match { + case Left(mag) => + Some( + ArtifactStorageReport(organizationId, + mag._dataset, + Left(mag._id), + pathReport.path, + pathReport.usedStorageBytes)) + case Right(attachment) => + val attachmentId = ObjectId.fromStringSync(attachment._Id) + attachmentId.flatMap( + id => + Some( + ArtifactStorageReport(organizationId, + ObjectId(attachment._Dataset), + Right(id), + pathReport.path, + pathReport.usedStorageBytes))) + + } + }) + def refreshStorageReportForDataset(dataset: Dataset): Fox[Unit] = for { + _ <- Fox.successful(()) dataStore <- datasetService.dataStoreFor(dataset) _ <- if (dataStore.reportUsedStorageEnabled) { - val dataStoreClient = new WKRemoteDataStoreClient(dataStore, rpc) for { organization <- organizationDAO.findOne(dataset._organization) - report <- dataStoreClient.fetchStorageReport(organization._id, Some(dataset.name)) + reports <- getNewestStorageReports(dataStore, organization, Some(dataset._id)) _ <- organizationDAO.deleteUsedStorageForDataset(dataset._id) - _ <- organizationDAO.upsertUsedStorage(organization._id, dataStore.name, report) + _ <- Fox.runIfNonEmpty(reports)(organizationDAO.upsertUsedStorage(organization._id, reports)) ?~> "Failed to upsert used storage reports into db" } yield () } else Fox.successful(()) } yield () diff --git a/conf/application.conf b/conf/application.conf index 0922f80aa3a..4a710129e20 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -95,9 +95,9 @@ webKnossos { user.timeout = 3 minutes } annotation.mutex.expiryTime = 2 minutes - fetchUsedStorage { - rescanInterval = 24 hours # do not scan organizations whose last scan is more recent than this - tickerInterval = 10 minutes # scan some organizations at each tick + fetchUsedStorage { # TODOM: undo these changes + rescanInterval = 10 seconds # do not scan organizations whose last scan is more recent than this + tickerInterval = 10 seconds # scan some organizations at each tick scansPerTick = 10 # scan x organizations at each tick } sampleOrganization { diff --git a/conf/evolutions/138-remote-storage-analysis.sql b/conf/evolutions/138-remote-storage-analysis.sql new file mode 100644 index 00000000000..564c03dcc54 --- /dev/null +++ b/conf/evolutions/138-remote-storage-analysis.sql @@ -0,0 +1,59 @@ +START TRANSACTION; + +do $$ begin ASSERT (select schemaVersion from webknossos.releaseInformation) = 137, 'Previous schema version mismatch'; end; $$ LANGUAGE plpgsql; + +-- 1. Add `_id` to dataset_layer_attachments +ALTER TABLE webknossos.dataset_layer_attachments + ADD COLUMN _id TEXT CONSTRAINT _id_objectId CHECK (_id ~ '^[0-9a-f]{24}$') UNIQUE; + +-- 2. Populate missing `_id`s in dataset_layer_attachments +UPDATE webknossos.dataset_layer_attachments +SET _id = webknossos.generate_object_id() +WHERE _id IS NULL; + +-- 3. Make `_id` column NOT NULL after filling it +ALTER TABLE webknossos.dataset_layer_attachments ALTER COLUMN _id SET NOT NULL; + +-- 4. Add `_id` to dataset_mags if it doesn't exist +ALTER TABLE webknossos.dataset_mags + ADD COLUMN _id TEXT CONSTRAINT _id_objectId CHECK (_id ~ '^[0-9a-f]{24}$') UNIQUE; + +-- 5. Populate missing `_id`s in dataset_mags +UPDATE webknossos.dataset_mags +SET _id = webknossos.generate_object_id() +WHERE _id IS NULL; + +-- 6. Make `_id` column NOT NULL after filling it +ALTER TABLE webknossos.dataset_mags ALTER COLUMN _id SET NOT NULL; + +-- 7. Drop old organization_usedStorage table if it exists +DROP TABLE IF EXISTS webknossos.organization_usedStorage; + +-- 8. Create the new organization_usedStorage table +CREATE TABLE webknossos.organization_usedStorage ( + _organization TEXT NOT NULL, + _dataset TEXT NOT NULL, + _dataset_mag TEXT + CONSTRAINT _dataset_mag_objectId CHECK (_dataset_mag IS NULL OR _dataset_mag ~ '^[0-9a-f]{24}$') UNIQUE, + _layer_attachment TEXT + CONSTRAINT _layer_attachment_objectId CHECK (_layer_attachment IS NULL OR _layer_attachment ~ '^[0-9a-f]{24}$') UNIQUE, + path TEXT NOT NULL, + usedStorageBytes BIGINT NOT NULL, + lastUpdated TIMESTAMPTZ, + PRIMARY KEY (_dataset_mag, _layer_attachment), + CHECK ( + (_dataset_mag IS NOT NULL AND _layer_attachment IS NULL) + OR + (_dataset_mag IS NULL AND _layer_attachment IS NOT NULL) + ) +); + +-- 9. Reset all storage scan timestamps to fill the new webknossos.organization_usedStorage table +UPDATE webknossos.organizations +SET lastStorageScanTime = '1970-01-01T00:00:00.000Z' +WHERE _id IS NULL; + + +UPDATE webknossos.releaseInformation SET schemaVersion = 138; + +COMMIT TRANSACTION; diff --git a/conf/evolutions/reversions/138-remote-storage-analysis.sql b/conf/evolutions/reversions/138-remote-storage-analysis.sql new file mode 100644 index 00000000000..c71124945d2 --- /dev/null +++ b/conf/evolutions/reversions/138-remote-storage-analysis.sql @@ -0,0 +1,36 @@ +START TRANSACTION; + +do $$ begin ASSERT (select schemaVersion from webknossos.releaseInformation) = 138, 'Previous schema version mismatch'; end; $$ LANGUAGE plpgsql; + +-- 1. Drop the new organization_usedStorage table +DROP TABLE IF EXISTS webknossos.organization_usedStorage; + +-- 2. Recreate the old version of organization_usedStorage +CREATE TABLE webknossos.organization_usedStorage ( + _organization TEXT NOT NULL, + _dataStore TEXT NOT NULL, + _dataset TEXT CONSTRAINT _dataset_objectId CHECK (_dataset ~ '^[0-9a-f]{24}$') NOT NULL, + layerName TEXT NOT NULL, + magOrDirectoryName TEXT NOT NULL, + usedStorageBytes BIGINT NOT NULL, + lastUpdated TIMESTAMPTZ, + PRIMARY KEY (_organization, _dataStore, _dataset, layerName, magOrDirectoryName) +); + +-- 3. Drop `_id` column from dataset_layer_attachments +ALTER TABLE webknossos.dataset_layer_attachments + DROP COLUMN IF EXISTS _id; + +-- 4. Drop `_id` column from dataset_mags +ALTER TABLE webknossos.dataset_mags + DROP COLUMN IF EXISTS _id; + +-- 5. Reset all storage scan timestamps to fill the new webknossos.organization_usedStorage table +UPDATE webknossos.organizations +SET lastStorageScanTime = '1970-01-01T00:00:00.000Z' +WHERE _id IS NULL; + +-- 5. Revert schema version +UPDATE webknossos.releaseInformation SET schemaVersion = 137; + +COMMIT TRANSACTION; diff --git a/test/backend/DataVaultTestSuite.scala b/test/backend/DataVaultTestSuite.scala index 3c8aba8210d..20f6fd925ea 100644 --- a/test/backend/DataVaultTestSuite.scala +++ b/test/backend/DataVaultTestSuite.scala @@ -225,6 +225,9 @@ class DataVaultTestSuite extends PlaySpec { override def listDirectory(path: VaultPath, maxItems: Int)(implicit ec: ExecutionContext): Fox[List[VaultPath]] = ??? + + override def getUsedStorageBytes(path: VaultPath)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Long] = + ??? } "Uri has no trailing slash" should { diff --git a/tools/postgres/schema.sql b/tools/postgres/schema.sql index d37023310bf..24c1b033168 100644 --- a/tools/postgres/schema.sql +++ b/tools/postgres/schema.sql @@ -24,6 +24,32 @@ CREATE TABLE webknossos.releaseInformation ( INSERT INTO webknossos.releaseInformation(schemaVersion) values(137); COMMIT TRANSACTION; +-- ObjectId generation function taken and modified from https://thinhdanggroup.github.io/mongo-id-in-postgresql/ +CREATE SEQUENCE webknossos.objectid_sequence; + +CREATE FUNCTION webknossos.generate_object_id() RETURNS TEXT AS $$ +DECLARE + time_component TEXT; + machine_id TEXT; + process_id TEXT; + counter TEXT; + result TEXT; +BEGIN + -- Extract the current timestamp in seconds since the Unix epoch (4 bytes, 8 hex chars) + SELECT LPAD(TO_HEX(FLOOR(EXTRACT(EPOCH FROM clock_timestamp()))::BIGINT), 8, '0') INTO time_component; + -- Generate a machine identifier using the hash of the server IP (3 bytes, 6 hex chars) + SELECT SUBSTRING(md5(CAST(inet_server_addr() AS TEXT)) FROM 1 FOR 6) INTO machine_id; + -- Retrieve the current backend process ID, limited to 2 bytes (4 hex chars) + SELECT LPAD(TO_HEX(pg_backend_pid() % 65536), 4, '0') INTO process_id; + -- Generate a counter using a sequence, ensuring it's 3 bytes (6 hex chars) + SELECT LPAD(TO_HEX(nextval('webknossos.objectid_sequence')::BIGINT % 16777216), 6, '0') INTO counter; + -- Concatenate all parts to form a 24-character ObjectId + result := time_component || machine_id || process_id || counter; + + RETURN result; +END; +$$ LANGUAGE plpgsql; + CREATE TYPE webknossos.ANNOTATION_TYPE AS ENUM ('Task', 'Explorational', 'TracingBase', 'Orphan'); CREATE TYPE webknossos.ANNOTATION_STATE AS ENUM ('Active', 'Finished', 'Cancelled', 'Initializing'); @@ -166,6 +192,7 @@ CREATE TABLE webknossos.dataset_layer_additionalAxes( CREATE TYPE webknossos.LAYER_ATTACHMENT_TYPE AS ENUM ('agglomerate', 'connectome', 'segmentIndex', 'mesh', 'cumsum'); CREATE TYPE webknossos.LAYER_ATTACHMENT_DATAFORMAT AS ENUM ('hdf5', 'zarr3', 'json', 'neuroglancerPrecomputed'); CREATE TABLE webknossos.dataset_layer_attachments( + _id TEXT CONSTRAINT _id_objectId CHECK (_id ~ '^[0-9a-f]{24}$') NOT NULL, _dataset TEXT CONSTRAINT _dataset_objectId CHECK (_dataset ~ '^[0-9a-f]{24}$') NOT NULL, layerName TEXT NOT NULL, name TEXT NOT NULL, @@ -182,6 +209,7 @@ CREATE TABLE webknossos.dataset_allowedTeams( ); CREATE TABLE webknossos.dataset_mags( + _id TEXT CONSTRAINT _id_objectId CHECK (_id ~ '^[0-9a-f]{24}$') NOT NULL, _dataset TEXT CONSTRAINT _dataset_objectId CHECK (_dataset ~ '^[0-9a-f]{24}$') NOT NULL, dataLayerName TEXT, mag webknossos.VECTOR3 NOT NULL, @@ -352,16 +380,35 @@ CREATE TABLE webknossos.organizations( CONSTRAINT validOrganizationId CHECK (_id ~* '^[A-Za-z0-9\-_. ]+$') ); -CREATE TABLE webknossos.organization_usedStorage( + +CREATE TABLE webknossos.organization_usedStorage ( + -- Mostly ignored. Just needed because either of (_dataset_mag, _layer_attachment) must be null and part of a primary key can be null. + id TEXT PRIMARY KEY + DEFAULT webknossos.generate_object_id() + CONSTRAINT id_objectId CHECK (id ~ '^[0-9a-f]{24}$'), _organization TEXT NOT NULL, - _dataStore TEXT NOT NULL, - _dataset TEXT CONSTRAINT _dataset_objectId CHECK (_dataset ~ '^[0-9a-f]{24}$') NOT NULL, - layerName TEXT NOT NULL, - magOrDirectoryName TEXT NOT NULL, - usedStorageBytes BIGINT NOT NULL, - lastUpdated TIMESTAMPTZ, - PRIMARY KEY(_organization, _dataStore, _dataset, layerName, magOrDirectoryName) + _dataset TEXT NOT NULL, + _dataset_mag TEXT CONSTRAINT _dataset_mag_objectId CHECK (_dataset_mag IS NULL OR _dataset_mag ~ '^[0-9a-f]{24}$'), + _layer_attachment TEXT CONSTRAINT _layer_attachment_objectId CHECK (_layer_attachment IS NULL OR _layer_attachment ~ '^[0-9a-f]{24}$'), + path TEXT NOT NULL, + usedStorageBytes BIGINT NOT NULL, + lastUpdated TIMESTAMPTZ NOT NULL DEFAULT NOW() + CHECK ( + (_dataset_mag IS NOT NULL AND _layer_attachment IS NULL) + OR + (_dataset_mag IS NULL AND _layer_attachment IS NOT NULL) + ), + CONSTRAINT unique_dataset_mag UNIQUE (_dataset_mag), + CONSTRAINT unique_layer_attachment UNIQUE (_layer_attachment) ); +-- Create unique indexes for _dataset_mag and _layer_attachment to enforce their uniqueness -> at most one entry per dataset_mag or layer_attachment +CREATE UNIQUE INDEX _dataset_mag_unique_idx + ON webknossos.organization_usedStorage(_dataset_mag) + WHERE _dataset_mag IS NOT NULL; + +CREATE UNIQUE INDEX _layer_attachment_unique_idx + ON webknossos.organization_usedStorage(_layer_attachment) + WHERE _layer_attachment IS NOT NULL; -- Create the enum types for transaction states and credit states -- Pending -> The transaction is a payment for a unfinished & not crashed job @@ -1026,32 +1073,6 @@ CREATE TRIGGER enforce_non_negative_balance_trigger BEFORE INSERT OR UPDATE ON webknossos.credit_transactions FOR EACH ROW EXECUTE PROCEDURE webknossos.enforce_non_negative_balance(); --- ObjectId generation function taken and modified from https://thinhdanggroup.github.io/mongo-id-in-postgresql/ -CREATE SEQUENCE webknossos.objectid_sequence; - -CREATE FUNCTION webknossos.generate_object_id() RETURNS TEXT AS $$ -DECLARE - time_component TEXT; - machine_id TEXT; - process_id TEXT; - counter TEXT; - result TEXT; -BEGIN - -- Extract the current timestamp in seconds since the Unix epoch (4 bytes, 8 hex chars) - SELECT LPAD(TO_HEX(FLOOR(EXTRACT(EPOCH FROM clock_timestamp()))::BIGINT), 8, '0') INTO time_component; - -- Generate a machine identifier using the hash of the server IP (3 bytes, 6 hex chars) - SELECT SUBSTRING(md5(CAST(inet_server_addr() AS TEXT)) FROM 1 FOR 6) INTO machine_id; - -- Retrieve the current backend process ID, limited to 2 bytes (4 hex chars) - SELECT LPAD(TO_HEX(pg_backend_pid() % 65536), 4, '0') INTO process_id; - -- Generate a counter using a sequence, ensuring it's 3 bytes (6 hex chars) - SELECT LPAD(TO_HEX(nextval('webknossos.objectid_sequence')::BIGINT % 16777216), 6, '0') INTO counter; - -- Concatenate all parts to form a 24-character ObjectId - result := time_component || machine_id || process_id || counter; - - RETURN result; -END; -$$ LANGUAGE plpgsql; - CREATE FUNCTION webknossos.hand_out_monthly_free_credits(free_credits_amount DECIMAL) RETURNS VOID AS $$ DECLARE diff --git a/util/src/main/scala/com/scalableminds/util/io/URIUtils.scala b/util/src/main/scala/com/scalableminds/util/io/URIUtils.scala new file mode 100644 index 00000000000..960b738ffbe --- /dev/null +++ b/util/src/main/scala/com/scalableminds/util/io/URIUtils.scala @@ -0,0 +1,21 @@ +package com.scalableminds.util.io + +import java.net.URI + +object URIUtils extends URIUtils + +trait URIUtils { + + def isSubpath(parentUri: URI, subUri: URI): Boolean = { + // Compare scheme and authority + if (parentUri.getScheme != subUri.getScheme || + parentUri.getAuthority != subUri.getAuthority) return false + + // Normalize paths + val configPath = parentUri.getPath.stripSuffix("/") + val targetPath = subUri.getPath.stripSuffix("/") + + // Ensure target path starts with the configured path followed by "/" or is exactly the same + targetPath == configPath || targetPath.startsWith(configPath + "/") + } +} diff --git a/util/src/main/scala/com/scalableminds/util/tools/Fox.scala b/util/src/main/scala/com/scalableminds/util/tools/Fox.scala index e5968d27578..615056d1ad3 100644 --- a/util/src/main/scala/com/scalableminds/util/tools/Fox.scala +++ b/util/src/main/scala/com/scalableminds/util/tools/Fox.scala @@ -178,6 +178,13 @@ object Fox extends FoxImplicits { def runIfOptionTrue[B](condition: Option[Boolean])(f: => Fox[B])(implicit ec: ExecutionContext): Fox[Option[B]] = runIf(condition.getOrElse(false))(f) + def runIfNonEmpty[A, B](list: List[A])(f: => B)(implicit ec: ExecutionContext): Fox[Option[B]] = + if(list.nonEmpty){ + Fox.successful(Some(f)) + } else { + Fox.successful(None) + } + def fillOption[A](opt: Option[A])(f: => Fox[A])(implicit ec: ExecutionContext): Fox[A] = opt match { case Some(a) => Fox.successful(a) diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala index 4a307798dfc..a4d6c6ffb45 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala @@ -393,19 +393,19 @@ class DataSourceController @Inject()( } } - def measureUsedStorage(organizationId: String, datasetDirectoryName: Option[String] = None): Action[AnyContent] = - Action.async { implicit request => + def measureUsedStorage(organizationId: String): Action[PathStorageUsageRequest] = + Action.async(validateJson[PathStorageUsageRequest]) { implicit request => log() { accessTokenService.validateAccessFromTokenContext(UserAccessRequest.administrateDataSources(organizationId)) { for { before <- Instant.nowFox - usedStorageInBytes: List[DirectoryStorageReport] <- storageUsageService.measureStorage(organizationId, - datasetDirectoryName) + pathStorageReports <- storageUsageService.measureStorageForPaths(request.body.paths, organizationId) _ = if (Instant.since(before) > (10 seconds)) { - val datasetLabel = datasetDirectoryName.map(n => s" dataset $n of").getOrElse("") - Instant.logSince(before, s"Measuring storage for$datasetLabel orga $organizationId", logger) + Instant.logSince(before, + s"Measuring storage for orga $organizationId for ${request.body.paths.length} paths.", + logger) } - } yield Ok(Json.toJson(usedStorageInBytes)) + } yield Ok(Json.toJson(PathStorageUsageResponse(reports = pathStorageReports))) } } } diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/DataVault.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/DataVault.scala index 7ef95b2e305..f7a01ef2861 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/DataVault.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/DataVault.scala @@ -10,4 +10,6 @@ trait DataVault { tc: TokenContext): Fox[(Array[Byte], Encoding.Value)] def listDirectory(path: VaultPath, maxItems: Int)(implicit ec: ExecutionContext): Fox[List[VaultPath]] + + def getUsedStorageBytes(path: VaultPath)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Long] } diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/FileSystemDataVault.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/FileSystemDataVault.scala index 45c6e41f692..04d52a4b39c 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/FileSystemDataVault.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/FileSystemDataVault.scala @@ -1,9 +1,11 @@ package com.scalableminds.webknossos.datastore.datavault import com.scalableminds.util.accesscontext.TokenContext -import com.scalableminds.util.tools.Fox +import com.scalableminds.util.tools.Box.tryo +import com.scalableminds.util.tools.{Fox, FoxImplicits} import com.scalableminds.webknossos.datastore.storage.DataVaultService import com.scalableminds.util.tools.{Box, Full} +import org.apache.commons.io.FileUtils import org.apache.commons.lang3.builder.HashCodeBuilder import java.nio.ByteBuffer @@ -13,7 +15,7 @@ import java.util.stream.Collectors import scala.concurrent.{ExecutionContext, Promise} import scala.jdk.CollectionConverters._ -class FileSystemDataVault extends DataVault { +class FileSystemDataVault extends DataVault with FoxImplicits{ override def readBytesAndEncoding(path: VaultPath, range: RangeSpecifier)( implicit ec: ExecutionContext, @@ -91,6 +93,12 @@ class FileSystemDataVault extends DataVault { } else List.empty } yield listing + override def getUsedStorageBytes(path: VaultPath)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Long] = + for { + localPath <- vaultPathToLocalPath(path) + usedStorageBytes <- tryo(FileUtils.sizeOfDirectoryAsBigInteger(localPath.toFile).longValue).toFox ?~> "Failed to get used storage bytes" + } yield usedStorageBytes + private def vaultPathToLocalPath(path: VaultPath)(implicit ec: ExecutionContext): Fox[Path] = { val uri = path.toUri for { diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/GoogleCloudDataVault.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/GoogleCloudDataVault.scala index a72364cd6dc..03b2a3760e0 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/GoogleCloudDataVault.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/GoogleCloudDataVault.scala @@ -12,7 +12,7 @@ import java.io.ByteArrayInputStream import java.net.URI import java.nio.ByteBuffer import scala.concurrent.ExecutionContext -import scala.jdk.CollectionConverters.IterableHasAsScala +import scala.jdk.CollectionConverters.{IterableHasAsScala, IteratorHasAsScala} class GoogleCloudDataVault(uri: URI, credential: Option[GoogleServiceAccountCredential]) extends DataVault @@ -88,6 +88,16 @@ class GoogleCloudDataVault(uri: URI, credential: Option[GoogleServiceAccountCred paths }).toFox + override def getUsedStorageBytes(path: VaultPath)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Long] = + tryo({ + val objName = path.toUri.getPath.tail + val blobs = + storage.list(bucket, + Storage.BlobListOption.prefix(objName) /* no currentDirectory(); Do deep recursive listing */ ) + val totalSize = blobs.iterateAll().iterator().asScala.map(_.getSize).foldLeft(0L)(_ + _) + totalSize + }).toFox + private def getUri = uri private def getCredential = credential diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/HttpsDataVault.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/HttpsDataVault.scala index e263260d427..d5fa220e236 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/HttpsDataVault.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/HttpsDataVault.scala @@ -54,6 +54,10 @@ class HttpsDataVault(credential: Option[DataVaultCredential], ws: WSClient, data // HTTP file listing is currently not supported. Fox.successful(List.empty) + override def getUsedStorageBytes(path: VaultPath)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Long] = + // paid HTTP file storage is not supported. + Fox.successful(0L) + private val headerInfoCache: AlfuCache[URI, (Boolean, Long)] = AlfuCache() private def getHeaderInformation(uri: URI)(implicit ec: ExecutionContext): Fox[(Boolean, Long)] = diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/S3DataVault.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/S3DataVault.scala index 11393142b8b..0d17bf9afaa 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/S3DataVault.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/S3DataVault.scala @@ -136,6 +136,32 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], } yield s3SubPrefixes.map(_.prefix()) } + override def getUsedStorageBytes(path: VaultPath)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Long] = { + def fetchBatch(prefixKey: String, + client: S3AsyncClient, + continuationToken: Option[String], + alreadyMeasuredSize: Long): Fox[Long] = { + val builder = ListObjectsV2Request.builder().bucket(bucketName).prefix(prefixKey).maxKeys(1000) + continuationToken.foreach(builder.continuationToken) + val request = builder.build() + + for { + objectListing <- notFoundToFailure(client.listObjectsV2(request).asScala) + totalCurrentSize = objectListing.contents().asScala.map(_.size()).foldLeft(alreadyMeasuredSize)(_ + _) + result <- if (objectListing.isTruncated) + fetchBatch(prefixKey, client, Option(objectListing.nextContinuationToken()), totalCurrentSize) + else + Fox.successful(totalCurrentSize) + } yield result + } + + for { + prefixKey <- S3DataVault.objectKeyFromUri(path.toUri).toFox + client <- clientFox + totalSize <- fetchBatch(prefixKey, client, None, 0) + } yield totalSize + } + private def getUri = uri private def getCredential = s3AccessKeyCredential diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/VaultPath.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/VaultPath.scala index 03f69c4e336..20780f6a58f 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/VaultPath.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/VaultPath.scala @@ -24,6 +24,8 @@ class VaultPath(uri: URI, dataVault: DataVault) extends LazyLogging with FoxImpl decoded <- decode(bytesAndEncoding) ?~> s"Failed to decode ${bytesAndEncoding._2}-encoded response." } yield decoded + def getUsedStorageBytes(implicit ec: ExecutionContext, tc: TokenContext): Fox[Long] = dataVault.getUsedStorageBytes(this) + def readLastBytes(byteCount: Int)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Array[Byte]] = for { bytesAndEncoding <- dataVault.readBytesAndEncoding(this, SuffixLength(byteCount)) ?=> "Failed to read from vault path" diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSUsedStorageService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSUsedStorageService.scala index d877424f042..36b29c915ab 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSUsedStorageService.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSUsedStorageService.scala @@ -1,16 +1,15 @@ package com.scalableminds.webknossos.datastore.services -import com.scalableminds.util.geometry.Vec3Int -import com.scalableminds.util.io.PathUtils -import com.scalableminds.util.tools.{Fox, FoxImplicits} +import com.scalableminds.util.accesscontext.TokenContext +import com.scalableminds.util.io.URIUtils +import com.scalableminds.util.tools.{Fox, FoxImplicits, Full} import com.scalableminds.webknossos.datastore.DataStoreConfig import com.typesafe.scalalogging.LazyLogging -import com.scalableminds.util.tools.Box -import com.scalableminds.util.tools.Box.tryo -import org.apache.commons.io.FileUtils +import com.scalableminds.webknossos.datastore.storage.{DataVaultService, RemoteSourceDescriptor} import play.api.libs.json.{Json, OFormat} -import java.nio.file.{Files, Path} +import java.net.URI +import java.nio.file.Path import javax.inject.Inject import scala.concurrent.ExecutionContext @@ -26,15 +25,64 @@ object DirectoryStorageReport { implicit val jsonFormat: OFormat[DirectoryStorageReport] = Json.format[DirectoryStorageReport] } -class DSUsedStorageService @Inject()(config: DataStoreConfig)(implicit ec: ExecutionContext) +case class PathStorageUsageRequest(paths: List[String]) +object PathStorageUsageRequest { + implicit val jsonFormat: OFormat[PathStorageUsageRequest] = Json.format[PathStorageUsageRequest] +} + +case class PathStorageReport( + path: String, + usedStorageBytes: Long +) +object PathStorageReport { + implicit val jsonFormat: OFormat[PathStorageReport] = Json.format[PathStorageReport] +} + +case class PathStorageUsageResponse(reports: List[PathStorageReport]) +object PathStorageUsageResponse { + implicit val jsonFormat: OFormat[PathStorageUsageResponse] = Json.format[PathStorageUsageResponse] +} + +class DSUsedStorageService @Inject()(config: DataStoreConfig, dataVaultService: DataVaultService) extends FoxImplicits - with LazyLogging { + with LazyLogging + with URIUtils { private val baseDir: Path = Path.of(config.Datastore.baseDirectory) - private def noSymlinksFilter(p: Path) = !Files.isSymbolicLink(p) + def measureStorageForPaths(paths: List[String], organizationId: String)( + implicit ec: ExecutionContext, + tc: TokenContext): Fox[List[PathStorageReport]] = { + val organizationDirectory = baseDir.resolve(organizationId) + val pathsAsURIs = paths.map(new URI(_)) + val pathsWithAbsoluteURIs = pathsAsURIs.map(uri => { + if (uri.getScheme == null || uri.getScheme == DataVaultService.schemeFile) { + organizationDirectory.resolve(uri.getPath).normalize().toAbsolutePath.toUri + } else + uri + }) + + // Check to only measure remote paths that are part of a vault that is configured + val absolutePathsToMeasure = pathsWithAbsoluteURIs.filter(uri => + uri.getScheme == DataVaultService.schemeFile || config.Datastore.DataVaults.credentials.exists(vault => + isSubpath(new URI(vault.getString("name")), uri))) + for { + vaultPaths <- Fox.serialCombined(absolutePathsToMeasure)(uri => + dataVaultService.getVaultPath(RemoteSourceDescriptor(uri, None))) + usedBytes <- Fox.fromFuture(Fox.serialSequence(vaultPaths)(vaultPath => vaultPath.getUsedStorageBytes)) + pathsWithStorageUsedBox = paths.zip(usedBytes) + successfulStorageUsedBoxes = pathsWithStorageUsedBox.collect { + case (path, Full(usedStorageBytes)) => + PathStorageReport(path, usedStorageBytes) + } + failedPaths = pathsWithStorageUsedBox.filter(p => p._2.isEmpty).map(_._1) + _ = Fox.runIfNonEmpty(failedPaths)( + logger.error( + s"Failed to measure storage for paths ${paths.length} paths: ${failedPaths.take(5).mkString(", ")}.")) + } yield successfulStorageUsedBoxes + } - def measureStorage(organizationId: String, datasetName: Option[String])( + /*def measureStorage(organizationId: String, datasetName: Option[String])( implicit ec: ExecutionContext): Fox[List[DirectoryStorageReport]] = { val organizationDirectory = baseDir.resolve(organizationId) if (Files.exists(organizationDirectory)) { @@ -42,6 +90,7 @@ class DSUsedStorageService @Inject()(config: DataStoreConfig)(implicit ec: Execu } else Fox.successful(List()) } + def measureStorage(organizationId: String, datasetName: Option[String], organizationDirectory: Path)( implicit ec: ExecutionContext): Fox[List[DirectoryStorageReport]] = { def selectedDatasetFilter(p: Path) = datasetName.forall(name => p.getFileName.toString == name) @@ -97,6 +146,6 @@ class DSUsedStorageService @Inject()(config: DataStoreConfig)(implicit ec: Execu } def measureStorage(path: Path): Box[Long] = - tryo(FileUtils.sizeOfDirectoryAsBigInteger(path.toFile).longValue) + tryo(FileUtils.sizeOfDirectoryAsBigInteger(path.toFile).longValue)*/ } diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/ZarrStreamingService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/ZarrStreamingService.scala new file mode 100644 index 00000000000..54813a70cc5 --- /dev/null +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/ZarrStreamingService.scala @@ -0,0 +1,224 @@ +package com.scalableminds.webknossos.datastore.services + +import com.google.inject.Inject +import com.scalableminds.util.accesscontext.TokenContext +import com.scalableminds.util.geometry.Vec3Int +import com.scalableminds.util.tools.{Fox, FoxImplicits} +import com.scalableminds.webknossos.datastore.dataformats.MagLocator +import com.scalableminds.webknossos.datastore.dataformats.layers.{ZarrDataLayer, ZarrLayer, ZarrSegmentationLayer} +import com.scalableminds.webknossos.datastore.dataformats.zarr.{Zarr3OutputHelper, ZarrCoordinatesParser} +import com.scalableminds.webknossos.datastore.datareaders.zarr._ +import com.scalableminds.webknossos.datastore.datareaders.zarr3.{NgffZarr3GroupHeader, Zarr3ArrayHeader} +import com.scalableminds.webknossos.datastore.models.annotation.AnnotationLayerType +import com.scalableminds.webknossos.datastore.models.datasource._ +import com.scalableminds.webknossos.datastore.models.requests._ +import com.scalableminds.webknossos.datastore.models.VoxelPosition +import play.api.i18n.{Messages, MessagesProvider} +import play.api.libs.json.{JsValue, Json} + +import scala.concurrent.ExecutionContext +import com.scalableminds.webknossos.datastore.datareaders.AxisOrder + +class ZarrStreamingService @Inject()( + datasetCache: DatasetCache, + binaryDataServiceHolder: BinaryDataServiceHolder, + remoteWebknossosClient: DSRemoteWebknossosClient, +)(implicit ec: ExecutionContext) + extends Zarr3OutputHelper + with FoxImplicits { + + val binaryDataService: BinaryDataService = binaryDataServiceHolder.binaryDataService + + def getHeader( + dataSource: DataSource, + dataLayer: DataLayer, + ): NgffMetadata = + NgffMetadata.fromNameVoxelSizeAndMags(dataLayer.name, dataSource.scale, dataLayer.sortedMags) + + def getGroupHeader( + dataSource: DataSource, + dataLayer: DataLayer + ): NgffZarr3GroupHeader = { + val omeNgffHeaderV0_5 = NgffMetadataV0_5.fromNameVoxelSizeAndMags(dataLayer.name, + dataSource.scale, + dataLayer.sortedMags, + dataLayer.additionalAxes) + + val zarr3GroupHeader = NgffZarr3GroupHeader(3, "group", omeNgffHeaderV0_5) + zarr3GroupHeader + } + + def zGroupJson: JsValue = Json.toJson(NgffGroupHeader(zarr_format = 2)) + + def getZarrDataSource( + dataSource: DataSource, + zarrVersion: Int + ): DataSource = { + val dataLayers = dataSource.dataLayers + val zarrLayers = dataLayers.map(convertLayerToZarrLayer(_, zarrVersion)) + val zarrSource = GenericDataSource[DataLayer](dataSource.id, zarrLayers, dataSource.scale) + zarrSource + } + + private def convertLayerToZarrLayer(layer: DataLayer, zarrVersion: Int): ZarrLayer = { + val dataFormat = if (zarrVersion == 2) DataFormat.zarr else DataFormat.zarr3 + layer match { + case s: SegmentationLayer => + val rank = s.additionalAxes.map(_.length).getOrElse(0) + 4 + ZarrSegmentationLayer( + s.name, + s.boundingBox, + s.elementClass, + mags = s.sortedMags.map( + m => + MagLocator(m, + Some(s"./${s.name}/${m.toMagLiteral(allowScalar = true)}"), + None, + Some(AxisOrder.cAdditionalxyz(rank)), + None, + None)), + mappings = s.mappings, + largestSegmentId = s.largestSegmentId, + numChannels = Some(if (s.elementClass == ElementClass.uint24) 3 else 1), + defaultViewConfiguration = s.defaultViewConfiguration, + adminViewConfiguration = s.adminViewConfiguration, + coordinateTransformations = s.coordinateTransformations, + additionalAxes = s.additionalAxes.map(reorderAdditionalAxes), + dataFormat = dataFormat + ) + case d: DataLayer => + val rank = d.additionalAxes.map(_.length).getOrElse(0) + 4 + ZarrDataLayer( + d.name, + d.category, + d.boundingBox, + d.elementClass, + mags = d.sortedMags.map( + m => + MagLocator(m, + Some(s"./${d.name}/${m.toMagLiteral(allowScalar = true)}"), + None, + Some(AxisOrder.cAdditionalxyz(rank)), + None, + None)), + numChannels = Some(if (d.elementClass == ElementClass.uint24) 3 else 1), + defaultViewConfiguration = d.defaultViewConfiguration, + adminViewConfiguration = d.adminViewConfiguration, + coordinateTransformations = d.coordinateTransformations, + additionalAxes = d.additionalAxes.map(reorderAdditionalAxes), + dataFormat = dataFormat + ) + } + } + + def rawZarrCube( + dataSource: DataSource, + dataLayer: DataLayer, + mag: String, + coordinates: String + )(implicit m: MessagesProvider, tc: TokenContext): Fox[Array[Byte]] = + for { + _ <- Fox.successful(()) + reorderedAdditionalAxes = dataLayer.additionalAxes.map(reorderAdditionalAxes) + (x, y, z, additionalCoordinates) <- ZarrCoordinatesParser.parseNDimensionalDotCoordinates( + coordinates, + reorderedAdditionalAxes) ?~> "zarr.invalidChunkCoordinates" + magParsed <- Vec3Int.fromMagLiteral(mag, allowScalar = true).toFox ?~> Messages("dataLayer.invalidMag", mag) + dataLayerName = dataLayer.name + _ <- Fox.fromBool(dataLayer.containsMag(magParsed)) ?~> Messages("dataLayer.wrongMag", dataLayerName, mag) + cubeSize = DataLayer.bucketLength + request = DataServiceDataRequest( + Some(dataSource.id), + dataLayer, + Cuboid( + topLeft = VoxelPosition(x * cubeSize * magParsed.x, + y * cubeSize * magParsed.y, + z * cubeSize * magParsed.z, + magParsed), + width = cubeSize, + height = cubeSize, + depth = cubeSize + ), + DataServiceRequestSettings(halfByte = false, additionalCoordinates = additionalCoordinates) + ) + (data, notFoundIndices) <- binaryDataService.handleDataRequests(List(request)) + _ <- Fox.fromBool(notFoundIndices.isEmpty) ~> "zarr.chunkNotFound" + } yield data + + def getZArray( + dataLayer: DataLayer, + mag: String + )(implicit m: MessagesProvider): Fox[ZarrHeader] = + for { + magParsed <- Vec3Int.fromMagLiteral(mag, allowScalar = true).toFox ?~> Messages("dataLayer.invalidMag", mag) + dataLayerName = dataLayer.name + _ <- Fox.fromBool(dataLayer.containsMag(magParsed)) ?~> Messages("dataLayer.wrongMag", dataLayerName, mag) + } yield ZarrHeader.fromLayer(dataLayer, magParsed) + + def requestZarrJsonForMag( + dataSource: DataSource, + dataLayer: DataLayer, + mag: String + )(implicit m: MessagesProvider): Fox[Zarr3ArrayHeader] = + for { + magParsed <- Vec3Int.fromMagLiteral(mag, allowScalar = true).toFox ?~> Messages("dataLayer.invalidMag", mag) + dataLayerName = dataLayer.name + _ <- Fox.fromBool(dataLayer.containsMag(magParsed)) ?~> Messages("dataLayer.wrongMag", dataLayerName, mag) + zarrHeader = Zarr3ArrayHeader.fromDataLayer(dataLayer, magParsed) + } yield zarrHeader + + def dataLayerDirectoryContents( + dataSource: DataSource, + dataLayer: DataLayer, + zarrVersion: Int + ): Fox[List[String]] = + for { + _ <- Fox.successful(()) + mags = dataLayer.sortedMags + additionalFiles = if (zarrVersion == 2) + List(NgffMetadata.FILENAME_DOT_ZATTRS, NgffGroupHeader.FILENAME_DOT_ZGROUP) + else List(Zarr3ArrayHeader.FILENAME_ZARR_JSON) + } yield (additionalFiles ++ mags.map(_.toMagLiteral(allowScalar = true))) + + def dataLayerMagDirectoryContents( + dataSource: DataSource, + dataLayer: DataLayer, + mag: String, + zarrVersion: Int + )(implicit m: MessagesProvider): Fox[List[String]] = + for { + magParsed <- Vec3Int.fromMagLiteral(mag, allowScalar = true).toFox ?~> Messages("dataLayer.invalidMag", mag) + dataLayerName = dataLayer.name + _ <- Fox.fromBool(dataLayer.containsMag(magParsed)) ?~> Messages("dataLayer.wrongMag", dataLayerName, mag) + additionalEntries = if (zarrVersion == 2) List(ZarrHeader.FILENAME_DOT_ZARRAY) + else List(Zarr3ArrayHeader.FILENAME_ZARR_JSON) + } yield additionalEntries + + def dataSourceDirectoryContents( + dataSource: DataSource, + zarrVersion: Int + ): Fox[List[String]] = + for { + _ <- Fox.successful(()) + layerNames = dataSource.dataLayers.map((dataLayer: DataLayer) => dataLayer.name) + additionalVersionDependantFiles = if (zarrVersion == 2) List(NgffGroupHeader.FILENAME_DOT_ZGROUP) + else List.empty + } yield (layerNames ++ additionalVersionDependantFiles) + + def dataSourceDirectoryContentsPrivateLink(accessToken: String, zarrVersion: Int)( + implicit tc: TokenContext): Fox[List[String]] = + for { + annotationSource <- remoteWebknossosClient.getAnnotationSource(accessToken) + dataSource <- datasetCache.getById(annotationSource.datasetId) + annotationLayerNames = annotationSource.annotationLayers.filter(_.typ == AnnotationLayerType.Volume).map(_.name) + dataSourceLayerNames = dataSource.dataLayers + .map((dataLayer: DataLayer) => dataLayer.name) + .filter(!annotationLayerNames.contains(_)) + layerNames = annotationLayerNames ++ dataSourceLayerNames + additionalEntries = if (zarrVersion == 2) + List(GenericDataSource.FILENAME_DATASOURCE_PROPERTIES_JSON, NgffGroupHeader.FILENAME_DOT_ZGROUP) + else + List(GenericDataSource.FILENAME_DATASOURCE_PROPERTIES_JSON) + } yield additionalEntries ++ layerNames + +} diff --git a/webknossos-datastore/conf/datastore.latest.routes b/webknossos-datastore/conf/datastore.latest.routes index 09a88987bf0..2212e75bdf1 100644 --- a/webknossos-datastore/conf/datastore.latest.routes +++ b/webknossos-datastore/conf/datastore.latest.routes @@ -107,7 +107,7 @@ POST /datasets/reserveUpload POST /datasets/reserveManualUpload @com.scalableminds.webknossos.datastore.controllers.DataSourceController.reserveManualUpload() POST /datasets/finishUpload @com.scalableminds.webknossos.datastore.controllers.DataSourceController.finishUpload() POST /datasets/cancelUpload @com.scalableminds.webknossos.datastore.controllers.DataSourceController.cancelUpload() -GET /datasets/measureUsedStorage/:organizationId @com.scalableminds.webknossos.datastore.controllers.DataSourceController.measureUsedStorage(organizationId: String, datasetDirectoryName: Option[String]) +POST /datasets/measureUsedStorage/:organizationId @com.scalableminds.webknossos.datastore.controllers.DataSourceController.measureUsedStorage(organizationId: String) GET /datasets/:datasetId/readInboxDataSource @com.scalableminds.webknossos.datastore.controllers.DataSourceController.readInboxDataSource(datasetId: ObjectId) PUT /datasets/:datasetId @com.scalableminds.webknossos.datastore.controllers.DataSourceController.update(datasetId: ObjectId) POST /datasets/:organizationId/:datasetName @com.scalableminds.webknossos.datastore.controllers.DataSourceController.add(organizationId: String, datasetName: String, folderId: Option[String])