From 418f5f7f3e10492b2a8c3d56052355347d1a5823 Mon Sep 17 00:00:00 2001 From: Brian Date: Tue, 2 Oct 2018 20:55:38 -0700 Subject: [PATCH 1/6] Added task manager flow --- NAMESPACE | 1 + R/doAzureParallel.R | 8 +++++--- R/utility-job.R | 6 ++---- R/utility-string.R | 4 ++++ inst/startup/install_custom.R | 10 +++++++++- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index c2bbb1a0..adc5c55a 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,5 +1,6 @@ # Generated by roxygen2: do not edit by hand +export(TaskWorkflowManager) export(createOutputFile) export(deleteJob) export(deleteStorageContainer) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 4302b76e..32430086 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -626,7 +626,7 @@ setHttpTraffic <- function(value = FALSE) { ) mergeOutput <- append(obj$options$azure$outputFiles, mergeOutput) - BatchUtilitiesOperations$addTask( + task <- TaskWorkflowManager$createTask( jobId = id, taskId = taskId, rCommand = sprintf( @@ -644,12 +644,14 @@ setHttpTraffic <- function(value = FALSE) { maxRetryCount = maxTaskRetryCount ) - cat("\r", sprintf("Submitting tasks (%s/%s)", i, length(endIndices)), sep = "") + cat("\r", sprintf("Creating tasks (%s/%s)", i, length(endIndices)), sep = "") flush.console() - return(taskId) + return(task) }) + TaskWorkflowManager$addTaskCollection(id, tasks) + if (enableCloudCombine) { cat("\nSubmitting merge task") taskDependencies <- list(taskIdRanges = list(list( diff --git a/R/utility-job.R b/R/utility-job.R index dfe0398d..e2e41548 100644 --- a/R/utility-job.R +++ b/R/utility-job.R @@ -499,9 +499,7 @@ waitForTasksToComplete <- for (i in 1:length(failedTasks$value)) { if (!is.null(failedTasks$value[[i]]$executionInfo$result) && - grepl(failedTasks$value[[i]]$executionInfo$result, - "failure", - ignore.case = TRUE)) { + compare(failedTasks$value[[i]]$executionInfo$result, "failure")) { tasksFailureWarningLabel <- paste0(tasksFailureWarningLabel, sprintf("%s\n", failedTasks$value[[i]]$id)) @@ -559,7 +557,7 @@ waitForTasksToComplete <- next } - if (grepl(mergeTask$executionInfo$result, "success", ignore.case = TRUE)) { + if (compare(mergeTask$executionInfo$result, "success")) { cat(" Completed.") break } diff --git a/R/utility-string.R b/R/utility-string.R index 95fda84a..09e86992 100644 --- a/R/utility-string.R +++ b/R/utility-string.R @@ -115,3 +115,7 @@ printCluster <- function(cluster, resourceFiles = list()) { } cat(strrep('=', options("width")), fill = TRUE) } + +compare <- function(a, b) { + return(grepl(a, b, ignore.case = TRUE)) +} diff --git a/inst/startup/install_custom.R b/inst/startup/install_custom.R index 7d39d60d..02b732ff 100644 --- a/inst/startup/install_custom.R +++ b/inst/startup/install_custom.R @@ -18,6 +18,13 @@ if (length(args) > 1) { } } +if (length(args) < 1) { + stop("Given arguments were not passed,", + "install_custom.R file_share_directory pattern") +} + +directory <- args[1] + devtoolsPackage <- "devtools" if (!require(devtoolsPackage, character.only = TRUE)) { install.packages(devtoolsPackage) @@ -25,7 +32,8 @@ if (!require(devtoolsPackage, character.only = TRUE)) { } packageDirs <- list.files( - path = tempDir, + path = directory, + pattern = pattern, full.names = TRUE, recursive = FALSE) From 1ccf6275497494fde6eb11efb07b16191f498f95 Mon Sep 17 00:00:00 2001 From: Brian Date: Tue, 2 Oct 2018 21:07:16 -0700 Subject: [PATCH 2/6] Added task flow manager --- R/task-manager.R | 265 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 R/task-manager.R diff --git a/R/task-manager.R b/R/task-manager.R new file mode 100644 index 00000000..cc37761d --- /dev/null +++ b/R/task-manager.R @@ -0,0 +1,265 @@ +#' @export +TaskWorkflowManager <- R6::R6Class( + "TaskManager", + public = list( + initialize = function(){ + + }, + threads = 1, + maxTasksPerRequest = 100, + createTask = function(jobId, taskId, rCommand, ...) { + config <- getConfiguration() + storageClient <- config$storageClient + + args <- list(...) + .doAzureBatchGlobals <- args$envir + dependsOn <- args$dependsOn + argsList <- args$args + cloudCombine <- args$cloudCombine + userOutputFiles <- args$outputFiles + containerImage <- args$containerImage + + accountName <- storageClient$authentication$name + + resourceFiles <- NULL + if (!is.null(argsList)) { + envFile <- paste0(taskId, ".rds") + saveRDS(argsList, file = envFile) + storageClient$blobOperations$uploadBlob( + jobId, + file.path(getwd(), envFile) + ) + file.remove(envFile) + + readToken <- storageClient$generateSasToken("r", "c", jobId) + envFileUrl <- + rAzureBatch::createBlobUrl( + storageClient$authentication$name, + jobId, + envFile, + readToken, + config$endpointSuffix) + resourceFiles <- + list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile)) + } + + # Only use the download command if cloudCombine is enabled + # Otherwise just leave it empty + commands <- c() + + containerSettings <- list( + imageName = containerImage, + containerRunOptions = "--rm" + ) + + if (!is.null(cloudCombine)) { + assign("cloudCombine", cloudCombine, .doAzureBatchGlobals) + containerSettings$imageName <- "brianlovedocker/doazureparallel-merge-dockerfile:0.12.1" + + copyCommand <- sprintf( + "%s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include results/*.rds --endpoint %s", + accountName, + jobId, + "$AZ_BATCH_TASK_WORKING_DIR", + config$endpointSuffix + ) + + commands <- c(paste("blobxfer", copyCommand)) + } + + exitConditions <- NULL + if (!is.null(args$dependsOn)) { + dependsOn <- args$dependsOn + } + else { + exitConditions <- list(default = list(dependencyAction = "satisfy")) + } + + containerUrl <- + rAzureBatch::createBlobUrl( + storageAccount = storageClient$authentication$name, + containerName = jobId, + sasToken = storageClient$generateSasToken("w", "c", jobId), + storageEndpointSuffix = config$endpointSuffix + ) + + outputFiles <- list( + list( + filePattern = paste0(taskId, ".txt"), + destination = list(container = list( + path = paste0("logs/", taskId, ".txt"), + containerUrl = containerUrl + )), + uploadOptions = list(uploadCondition = "taskCompletion") + ), + list( + filePattern = "../stdout.txt", + destination = list(container = list( + path = paste0("stdout/", taskId, "-stdout.txt"), + containerUrl = containerUrl + )), + uploadOptions = list(uploadCondition = "taskCompletion") + ), + list( + filePattern = "../stderr.txt", + destination = list(container = list( + path = paste0("stderr/", taskId, "-stderr.txt"), + containerUrl = containerUrl + )), + uploadOptions = list(uploadCondition = "taskCompletion") + ) + ) + + outputFiles <- append(outputFiles, userOutputFiles) + + commands <- + c(commands, + rCommand) + + commands <- linuxWrapCommands(commands) + + sasToken <- storageClient$generateSasToken("rwcl", "c", jobId) + queryParameterUrl <- "?" + + for (query in names(sasToken)) { + queryParameterUrl <- + paste0(queryParameterUrl, + query, + "=", + RCurl::curlEscape(sasToken[[query]]), + "&") + } + + queryParameterUrl <- + substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1) + + body <- list(id = taskId, + commandLine = commands, + userIdentity = list( + autoUser = list( + scope = "pool", + elevationLevel = "admin" + ) + ), + resourceFiles = resourceFiles, + dependsOn = dependsOn, + outputFiles = outputFiles, + constraints = list( + maxTaskRetryCount = 3 + ), + exitConditions = exitConditions, + containerSettings = containerSettings) + + body <- Filter(length, body) + + body + }, + addTaskCollection = function( + jobId, + tasksToAdd, + chunksToAdd, + threads = 1 + ){ + config <- getConfiguration() + batchClient <- config$batchClient + + len <- length(tasks) + + startIndex <- 1 + endIndex <- self$maxTasksPerRequest + front <- 1 + back <- length(tasks) + + failedToAdd <- vector("list", length = length(tasks)) + + chunkTasksToAdd <- list() + while (endIndex < len) { + chunkTasksToAdd <- tasks[startIndex:endIndex] + response <- batchClient$taskOperations$addCollection( + jobId, + list(value = chunkTasksToAdd), + content = "response" + ) + + values <- httr::content(response)$value + if (response$status_code == 413) { + + } + else if (500 <= response$status_code && + response$status_code <= 599) { + + } + else { + + } + + for (i in 1:length(values)) { + if (compare(values[[i]]$status, "servererror")) { + + } + else if (compare(values[[i]]$status, "clienterror") && + values[[i]]$error$code != "TaskExists") { + failedToAdd + } + else { + + } + } + + startIndex = startIndex + self$maxTasksPerRequest + endIndex = endIndex + self$maxTasksPerRequest + } + + if (startIndex < len) { + response <- batchClient$taskOperations$addCollection( + jobId, + list(value = tasks[startIndex:len]), + content = "response" + ) + + print(response) + } + }, + addTasks = function( + jobId, + tasks, + chunksToAdd + ){ + config <- getConfiguration() + batchClient <- config$batchClient + + response <- batchClient$taskOperations$addCollection( + jobId, + list(value = chunkTasksToAdd), + content = "response" + ) + + values <- httr::content(response)$value + if (response$status_code == 413) { + + } + else if (500 <= response$status_code && + response$status_code <= 599) { + + } + else { + + } + + for (i in 1:length(values)) { + if (compare(values[[i]]$status, "servererror")) { + + } + else if (compare(values[[i]]$status, "clienterror") && + values[[i]]$error$code != "TaskExists") { + failedToAdd + } + else { + + } + } + } + ) +) + +TaskWorkflowManager <- TaskWorkflowManager$new() From ae9c5ca04104250a0fbba2e9f01956d0a23ac583 Mon Sep 17 00:00:00 2001 From: Brian Hoang Date: Tue, 2 Oct 2018 22:13:53 -0700 Subject: [PATCH 3/6] Clean up tasks --- R/task-manager.R | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/R/task-manager.R b/R/task-manager.R index cc37761d..74ee74c1 100644 --- a/R/task-manager.R +++ b/R/task-manager.R @@ -3,8 +3,10 @@ TaskWorkflowManager <- R6::R6Class( "TaskManager", public = list( initialize = function(){ - + errors = list() }, + + errors = list(), threads = 1, maxTasksPerRequest = 100, createTask = function(jobId, taskId, rCommand, ...) { @@ -223,7 +225,7 @@ TaskWorkflowManager <- R6::R6Class( addTasks = function( jobId, tasks, - chunksToAdd + chunkTasksToAdd ){ config <- getConfiguration() batchClient <- config$batchClient @@ -233,19 +235,29 @@ TaskWorkflowManager <- R6::R6Class( list(value = chunkTasksToAdd), content = "response" ) - - values <- httr::content(response)$value + + # In case of a chunk exceeding the MaxMessageSize split chunk in half + # and resubmit smaller chunk requests if (response$status_code == 413) { - + midpoint <- length(chunkTasksToAdd) / 2 + addTasks(jobId, + tasks, + chunkTasksToAdd[1:midpoint]) + + <- addTasks(jobId, + tasks, + chunkTasksToAdd[midpoint:length(chunkTasksToAdd)]) } else if (500 <= response$status_code && response$status_code <= 599) { - + } else { } - + + values <- httr::content(response)$value + for (i in 1:length(values)) { if (compare(values[[i]]$status, "servererror")) { @@ -258,8 +270,13 @@ TaskWorkflowManager <- R6::R6Class( } } + + return(list( + addedTasks = addedTasks, + failedTasks = failedToAdd + )) } ) ) -TaskWorkflowManager <- TaskWorkflowManager$new() +TaskWorkflowManager <- TaskWorkflowManager$new() \ No newline at end of file From a3b39ef9dddd28f1f9e9a0e056bd6c4c50b66d4e Mon Sep 17 00:00:00 2001 From: Brian Hoang Date: Fri, 5 Oct 2018 10:56:53 -0700 Subject: [PATCH 4/6] Added queue class --- R/doAzureParallel.R | 102 +++------------------------- R/task-manager.R | 162 ++++++++++++++++++++++---------------------- 2 files changed, 91 insertions(+), 173 deletions(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index 32430086..b801f4d0 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -650,8 +650,12 @@ setHttpTraffic <- function(value = FALSE) { return(task) }) - TaskWorkflowManager$addTaskCollection(id, tasks) + # Submit collection of tasks + TaskWorkflowManager <- TaskWorkflowManager$new(tasks) + TaskWorkflowManager$handleTaskCollection(id, parallel) + # Process the report, validate + TaskWorkflowManager$failedTasksToAdd if (enableCloudCombine) { cat("\nSubmitting merge task") taskDependencies <- list(taskIdRanges = list(list( @@ -708,6 +712,9 @@ setHttpTraffic <- function(value = FALSE) { if (typeof(cloudCombine) == "list" && enableCloudCombine) { tempFile <- tempfile("doAzureParallel", fileext = ".rds") + tryCatch( + + ) response <- storageClient$blobOperations$downloadBlob( id, paste0("results/", "merge-result.rds"), @@ -724,7 +731,7 @@ setHttpTraffic <- function(value = FALSE) { numberOfFailedTasks <- sum(unlist(failTasks)) if (numberOfFailedTasks > 0 && autoDeleteJob == FALSE) { - .createErrorViewerPane(id, failTasks) + viewErrors(id, failTasks) } if (!identical(function(a, ...) c(a, list(...)), @@ -787,94 +794,3 @@ setHttpTraffic <- function(value = FALSE) { return(id) } } - -.createErrorViewerPane <- function(id, failTasks) { - config <- getConfiguration() - storageClient <- config$storageClient - - sasToken <- storageClient$generateSasToken("r", "c", id) - queryParameterUrl <- "?" - - for (query in names(sasToken)) { - queryParameterUrl <- - paste0(queryParameterUrl, - query, - "=", - RCurl::curlEscape(sasToken[[query]]), - "&") - } - - queryParameterUrl <- - substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1) - - tempDir <- tempfile() - dir.create(tempDir) - htmlFile <- file.path(tempDir, paste0(id, ".html")) - azureStorageUrl <- - paste0("http://", - storageCredentials$name, - sprintf(".blob.%s/", storageCredentials$endpointSuffix), - id) - - staticHtml <- "

Errors:

" - for (i in 1:length(failTasks)) { - if (failTasks[i] == 1) { - stdoutFile <- paste0(azureStorageUrl, "/", "stdout") - stderrFile <- paste0(azureStorageUrl, "/", "stderr") - rlogFile <- paste0(azureStorageUrl, "/", "logs") - - stdoutFile <- - paste0(stdoutFile, - "/", - id, - "-task", - i, - "-stdout.txt", - queryParameterUrl) - stderrFile <- - paste0(stderrFile, - "/", - id, - "-task", - i, - "-stderr.txt", - queryParameterUrl) - rlogFile <- - paste0(rlogFile, - "/", - id, - "-task", - i, - ".txt", - queryParameterUrl) - - staticHtml <- - paste0( - staticHtml, - "Task ", - i, - " | ", - "stdout.txt", - " |", - " ", - "stderr.txt", - " | ", - "R output", - "
" - ) - } - } - - write(staticHtml, htmlFile) - - viewer <- getOption("viewer") - if (!is.null(viewer)) { - viewer(htmlFile) - } -} diff --git a/R/task-manager.R b/R/task-manager.R index 74ee74c1..ab7d4f72 100644 --- a/R/task-manager.R +++ b/R/task-manager.R @@ -2,11 +2,16 @@ TaskWorkflowManager <- R6::R6Class( "TaskManager", public = list( - initialize = function(){ - errors = list() + initialize = function(tasks = list()){ + self$tasks = tasks + self$queue = tasks + self$results = vector("list", length(tasks)) + self$failedTasks = vector("list", length(tasks)) }, - - errors = list(), + tasks = NULL, + queue = NULL, + results = NULL, + failedTasks = NULL, threads = 1, maxTasksPerRequest = 100, createTask = function(jobId, taskId, rCommand, ...) { @@ -156,10 +161,8 @@ TaskWorkflowManager <- R6::R6Class( body }, - addTaskCollection = function( + handleTaskCollection = function( jobId, - tasksToAdd, - chunksToAdd, threads = 1 ){ config <- getConfiguration() @@ -167,69 +170,43 @@ TaskWorkflowManager <- R6::R6Class( len <- length(tasks) - startIndex <- 1 - endIndex <- self$maxTasksPerRequest - front <- 1 - back <- length(tasks) - - failedToAdd <- vector("list", length = length(tasks)) - - chunkTasksToAdd <- list() - while (endIndex < len) { - chunkTasksToAdd <- tasks[startIndex:endIndex] - response <- batchClient$taskOperations$addCollection( - jobId, - list(value = chunkTasksToAdd), - content = "response" - ) - - values <- httr::content(response)$value - if (response$status_code == 413) { - - } - else if (500 <= response$status_code && - response$status_code <= 599) { - - } - else { - - } - - for (i in 1:length(values)) { - if (compare(values[[i]]$status, "servererror")) { - - } - else if (compare(values[[i]]$status, "clienterror") && - values[[i]]$error$code != "TaskExists") { - failedToAdd - } - else { - - } + queueFront <- 1 + queueBack <- length(queue) + + unknownTasksFront <- 1 + unknownTasksBack <- 1 + + failedTasksFront <- 1 + failedTasksBack <- 1 + + tryCatch({ + chunkTasksToAdd <- NULL + while (queueFront != queueBack) { + startIndex <- queue$front + endIndex <- startIndex + self$maxTasksPerRequest + chunkTasksToAdd <- tasks[startIndex:endIndex] + + report <- addBulkTasks( + jobId, + self$results, + chunkTasksToAdd + ) + + queueFront = queueFront + self$maxTasksPerRequest } - - startIndex = startIndex + self$maxTasksPerRequest - endIndex = endIndex + self$maxTasksPerRequest - } - - if (startIndex < len) { - response <- batchClient$taskOperations$addCollection( - jobId, - list(value = tasks[startIndex:len]), - content = "response" - ) - - print(response) - } + }, + error = function(e){ + + }) }, - addTasks = function( + addBulkTasks = function( jobId, - tasks, + results, chunkTasksToAdd ){ config <- getConfiguration() batchClient <- config$batchClient - + response <- batchClient$taskOperations$addCollection( jobId, list(value = chunkTasksToAdd), @@ -239,44 +216,69 @@ TaskWorkflowManager <- R6::R6Class( # In case of a chunk exceeding the MaxMessageSize split chunk in half # and resubmit smaller chunk requests if (response$status_code == 413) { + if(length(chunkTasksToAdd) == 1){ + stop("Failed to add task with ID %s due to the body" + + " exceeding the maximum request size" + chunkTasksToAdd$id) + } + midpoint <- length(chunkTasksToAdd) / 2 - addTasks(jobId, - tasks, - chunkTasksToAdd[1:midpoint]) - <- addTasks(jobId, - tasks, - chunkTasksToAdd[midpoint:length(chunkTasksToAdd)]) + self$addBulkTasks( + jobId, + tasks, + chunkTasksToAdd[midpoint:length(chunkTasksToAdd)]) + + self$addBulkTasks( + jobId, + tasks, + chunkTasksToAdd[midpoint:length(chunkTasksToAdd)]) } else if (500 <= response$status_code && response$status_code <= 599) { - + failedTasks[[failed]] } else { - + unknownTasks[[unknown]] } values <- httr::content(response)$value for (i in 1:length(values)) { if (compare(values[[i]]$status, "servererror")) { - + self$queue$push(values[[i]]) } else if (compare(values[[i]]$status, "clienterror") && values[[i]]$error$code != "TaskExists") { - failedToAdd + self$failedTasks$push(values[[i]]) } else { - + self$results$push(values[[i]]) } } - - return(list( - addedTasks = addedTasks, - failedTasks = failedToAdd - )) } ) ) -TaskWorkflowManager <- TaskWorkflowManager$new() \ No newline at end of file +TaskWorkflowManager <- TaskWorkflowManager$new() + +Queue <- R6::R6Class( + "Queue", + public = list( + initialize = function(size){ + array = vector("list", size) + }, + slice = function(start, end){ + array[start:end] + }, + push = function(object){ + + }, + pop = function(){ + + }, + array = NULL, + size = NULL, + front = NULL, + back = NULL + ) +) \ No newline at end of file From 57d22631412237483067dc18edb0a2debf946fa1 Mon Sep 17 00:00:00 2001 From: Brian Hoang Date: Mon, 8 Oct 2018 14:35:41 -0700 Subject: [PATCH 5/6] Supported max size request --- R/doAzureParallel.R | 18 +++--- R/task-manager.R | 136 ++++++++++++++++++++------------------------ R/utility.R | 92 ++++++++++++++++++++++++++++++ 3 files changed, 162 insertions(+), 84 deletions(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index b801f4d0..a8d49d30 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -331,7 +331,12 @@ setHttpTraffic <- function(value = FALSE) { if (!is.null(obj$options$azure$chunksize)) { chunkSize <- obj$options$azure$chunksize } - + + threads <- 1 + if (!is.null(obj$options$azure$threads)) { + threads <- obj$options$azure$threads + } + chunkSizeKeyValuePair <- list(name = "chunkSize", value = as.character(chunkSize)) @@ -638,6 +643,7 @@ setHttpTraffic <- function(value = FALSE) { as.character(obj$errorHandling)), envir = .doAzureBatchGlobals, packages = obj$packages, + resourceFiles = resourceFiles, outputFiles = mergeOutput, containerImage = data$containerImage, args = args, @@ -651,11 +657,8 @@ setHttpTraffic <- function(value = FALSE) { }) # Submit collection of tasks - TaskWorkflowManager <- TaskWorkflowManager$new(tasks) - TaskWorkflowManager$handleTaskCollection(id, parallel) - - # Process the report, validate - TaskWorkflowManager$failedTasksToAdd + TaskWorkflowManager$handleTaskCollection(id, tasks, threads) + if (enableCloudCombine) { cat("\nSubmitting merge task") taskDependencies <- list(taskIdRanges = list(list( @@ -712,9 +715,6 @@ setHttpTraffic <- function(value = FALSE) { if (typeof(cloudCombine) == "list" && enableCloudCombine) { tempFile <- tempfile("doAzureParallel", fileext = ".rds") - tryCatch( - - ) response <- storageClient$blobOperations$downloadBlob( id, paste0("results/", "merge-result.rds"), diff --git a/R/task-manager.R b/R/task-manager.R index ab7d4f72..dda7f33f 100644 --- a/R/task-manager.R +++ b/R/task-manager.R @@ -2,16 +2,13 @@ TaskWorkflowManager <- R6::R6Class( "TaskManager", public = list( - initialize = function(tasks = list()){ - self$tasks = tasks - self$queue = tasks - self$results = vector("list", length(tasks)) - self$failedTasks = vector("list", length(tasks)) + initialize = function(){ }, - tasks = NULL, - queue = NULL, + originalTaskCollection = NULL, + tasksToAdd = NULL, results = NULL, failedTasks = NULL, + errors = NULL, threads = 1, maxTasksPerRequest = 100, createTask = function(jobId, taskId, rCommand, ...) { @@ -25,10 +22,9 @@ TaskWorkflowManager <- R6::R6Class( cloudCombine <- args$cloudCombine userOutputFiles <- args$outputFiles containerImage <- args$containerImage - + resourceFiles <- args$resourceFiles accountName <- storageClient$authentication$name - resourceFiles <- NULL if (!is.null(argsList)) { envFile <- paste0(taskId, ".rds") saveRDS(argsList, file = envFile) @@ -163,36 +159,43 @@ TaskWorkflowManager <- R6::R6Class( }, handleTaskCollection = function( jobId, + tasks, threads = 1 ){ - config <- getConfiguration() - batchClient <- config$batchClient - - len <- length(tasks) - - queueFront <- 1 - queueBack <- length(queue) + size <- length(tasks) + self$originalTaskCollection <- tasks - unknownTasksFront <- 1 - unknownTasksBack <- 1 + self$tasksToAdd <- datastructures::queue() + self$tasksToAdd <- datastructures::insert(self$tasksToAdd, tasks) - failedTasksFront <- 1 - failedTasksBack <- 1 + self$results <- datastructures::queue() + self$failedTasks <- datastructures::queue() + self$errors <- datastructures::queue() + config <- getConfiguration() + batchClient <- config$batchClient + tryCatch({ - chunkTasksToAdd <- NULL - while (queueFront != queueBack) { - startIndex <- queue$front - endIndex <- startIndex + self$maxTasksPerRequest - chunkTasksToAdd <- tasks[startIndex:endIndex] + while (datastructures::size(self$tasksToAdd) > 0 && + datastructures::size(self$errors) == 0) { + maxTasks <- self$maxTasksPerRequest + if (datastructures::size(self$tasksToAdd) < maxTasks) { + maxTasks <- datastructures::size(self$tasksToAdd) + } - report <- addBulkTasks( + chunkTasksToAdd <- vector("list", maxTasks) + index <- 1 + + while (index <= maxTasks && + datastructures::size(self$tasksToAdd) > 0){ + chunkTasksToAdd[[index]]<- datastructures::pop(self$tasksToAdd) + index <- index + 1 + } + + report <- self$addBulkTasks( jobId, - self$results, chunkTasksToAdd ) - - queueFront = queueFront + self$maxTasksPerRequest } }, error = function(e){ @@ -201,7 +204,6 @@ TaskWorkflowManager <- R6::R6Class( }, addBulkTasks = function( jobId, - results, chunkTasksToAdd ){ config <- getConfiguration() @@ -217,68 +219,52 @@ TaskWorkflowManager <- R6::R6Class( # and resubmit smaller chunk requests if (response$status_code == 413) { if(length(chunkTasksToAdd) == 1){ + self$errors$push(response) + stop("Failed to add task with ID %s due to the body" + - " exceeding the maximum request size" + chunkTasksToAdd$id) + " exceeding the maximum request size" + chunkTasksToAdd[[1]]$id) } - midpoint <- length(chunkTasksToAdd) / 2 + upperBound <- length(chunkTasksToAdd) + midBound <- upperBound / 2 + self$addBulkTasks( jobId, - tasks, - chunkTasksToAdd[midpoint:length(chunkTasksToAdd)]) + chunkTasksToAdd[1:midBound]) self$addBulkTasks( jobId, - tasks, - chunkTasksToAdd[midpoint:length(chunkTasksToAdd)]) + chunkTasksToAdd[(midBound+1):upperBound]) } else if (500 <= response$status_code && response$status_code <= 599) { - failedTasks[[failed]] - } - else { - unknownTasks[[unknown]] + self$tasksToAdd <- datastructures::insert(self$tasksToAdd, chunkTasksToAdd) } - - values <- httr::content(response)$value - - for (i in 1:length(values)) { - if (compare(values[[i]]$status, "servererror")) { - self$queue$push(values[[i]]) - } - else if (compare(values[[i]]$status, "clienterror") && - values[[i]]$error$code != "TaskExists") { - self$failedTasks$push(values[[i]]) - } - else { - self$results$push(values[[i]]) + else if (response$status_code == 200){ + values <- httr::content(response)$value + + for (i in 1:length(values)) { + taskId <- values[[i]]$id + + if (compare(values[[i]]$status, "servererror")) { + self$tasksToAdd <- datastructures::insert(self$tasksToAdd, self$originalTaskCollection[[taskId]]) + } + else if (compare(values[[i]]$status, "clienterror") && + values[[i]]$error$code != "TaskExists") { + self$failedTasks <- datastructures::insert(self$failedTasks, values[[i]]) + } + else { + self$results <- datastructures::insert(self$results, values[[i]]) + } } } + else { + self$tasksToAdd <- datastructures::insert(self$tasksToAdd, chunkTasksToAdd) + self$errors <- datastructures::insert(self$errors, response) + } } ) ) TaskWorkflowManager <- TaskWorkflowManager$new() - -Queue <- R6::R6Class( - "Queue", - public = list( - initialize = function(size){ - array = vector("list", size) - }, - slice = function(start, end){ - array[start:end] - }, - push = function(object){ - - }, - pop = function(){ - - }, - array = NULL, - size = NULL, - front = NULL, - back = NULL - ) -) \ No newline at end of file diff --git a/R/utility.R b/R/utility.R index 0bf689d4..7f0b63ad 100644 --- a/R/utility.R +++ b/R/utility.R @@ -304,3 +304,95 @@ getHttpErrorMessage <- function(responseObj) { detailMessage <- paste0(detailMessage, "\r\nodata.metadata: ", responseObj$odata.metadata) return(detailMessage) } + +viewErrors <- function(id, failTasks) { + config <- getConfiguration() + storageClient <- config$storageClient + + sasToken <- storageClient$generateSasToken("r", "c", id) + queryParameterUrl <- "?" + + for (query in names(sasToken)) { + queryParameterUrl <- + paste0(queryParameterUrl, + query, + "=", + RCurl::curlEscape(sasToken[[query]]), + "&") + } + + queryParameterUrl <- + substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1) + + tempDir <- tempfile() + dir.create(tempDir) + htmlFile <- file.path(tempDir, paste0(id, ".html")) + azureStorageUrl <- + paste0("http://", + storageCredentials$name, + sprintf(".blob.%s/", storageCredentials$endpointSuffix), + id) + + staticHtml <- "

Errors:

" + for (i in 1:length(failTasks)) { + if (failTasks[i] == 1) { + stdoutFile <- paste0(azureStorageUrl, "/", "stdout") + stderrFile <- paste0(azureStorageUrl, "/", "stderr") + rlogFile <- paste0(azureStorageUrl, "/", "logs") + + stdoutFile <- + paste0(stdoutFile, + "/", + id, + "-task", + i, + "-stdout.txt", + queryParameterUrl) + stderrFile <- + paste0(stderrFile, + "/", + id, + "-task", + i, + "-stderr.txt", + queryParameterUrl) + rlogFile <- + paste0(rlogFile, + "/", + id, + "-task", + i, + ".txt", + queryParameterUrl) + + staticHtml <- + paste0( + staticHtml, + "Task ", + i, + " | ", + "stdout.txt", + " |", + " ", + "stderr.txt", + " | ", + "R output", + "
" + ) + } + } + + write(staticHtml, htmlFile) + + viewer <- getOption("viewer") + if (!is.null(viewer)) { + viewer(htmlFile) + } +} + From 5f45e0ac1b6871d1cc33351d8b600c9995cff77d Mon Sep 17 00:00:00 2001 From: Brian Hoang Date: Mon, 8 Oct 2018 15:31:05 -0700 Subject: [PATCH 6/6] Removed resource files param --- R/doAzureParallel.R | 1 - 1 file changed, 1 deletion(-) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index a8d49d30..2194a5be 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -643,7 +643,6 @@ setHttpTraffic <- function(value = FALSE) { as.character(obj$errorHandling)), envir = .doAzureBatchGlobals, packages = obj$packages, - resourceFiles = resourceFiles, outputFiles = mergeOutput, containerImage = data$containerImage, args = args,