From 4e9f3050289b41f2816dc8df807acf1e9df7e25c Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 2 Jun 2025 08:45:35 -0400 Subject: [PATCH 1/3] fix: update correct server mode in parseable.json Steps to reproduce issue - 1. start prism and query node with enterprise build, stop both nodes 2. start standalone/distributed-Query with same staging and s3 bucket -- No env change -- hence Deployment ID remains same, server_mode remains Prism Fix: for mode `All` and `Query` even for no env change, update server_mode in parseable.json in storage --- src/storage/store_metadata.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index 2152f3d07..f0232bb93 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -117,9 +117,23 @@ pub async fn resolve_parseable_metadata( let mut overwrite_remote = false; let res = match check { - EnvChange::None(metadata) => { + EnvChange::None(mut metadata) => { // overwrite staging anyways so that it matches remote in case of any divergence overwrite_staging = true; + match PARSEABLE.options.mode { + Mode::All => { + metadata.server_mode.standalone_after_distributed()?; + overwrite_remote = true; + metadata.server_mode = PARSEABLE.options.mode; + metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); + } + Mode::Query => { + overwrite_remote = true; + metadata.server_mode = PARSEABLE.options.mode; + metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); + } + _=> {} + } if PARSEABLE.options.mode == Mode::All { metadata.server_mode.standalone_after_distributed()?; } From 4eb49da7a0973eb6b9135b1bb3403a0bb12e70d7 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 2 Jun 2025 11:54:28 -0400 Subject: [PATCH 2/3] refactor --- src/storage/store_metadata.rs | 70 +++++++++++++++-------------------- 1 file changed, 30 insertions(+), 40 deletions(-) diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index f0232bb93..9926f64b7 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -116,7 +116,7 @@ pub async fn resolve_parseable_metadata( let mut overwrite_staging = false; let mut overwrite_remote = false; - let res = match check { + let res: Result = match check { EnvChange::None(mut metadata) => { // overwrite staging anyways so that it matches remote in case of any divergence overwrite_staging = true; @@ -124,13 +124,11 @@ pub async fn resolve_parseable_metadata( Mode::All => { metadata.server_mode.standalone_after_distributed()?; overwrite_remote = true; - metadata.server_mode = PARSEABLE.options.mode; - metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); + update_metadata_mode_and_staging(&mut metadata); } Mode::Query => { overwrite_remote = true; - metadata.server_mode = PARSEABLE.options.mode; - metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); + update_metadata_mode_and_staging(&mut metadata); } _=> {} } @@ -143,46 +141,33 @@ pub async fn resolve_parseable_metadata( Err("Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server") } EnvChange::NewStaging(mut metadata) => { - - // if server is started in ingest mode,we need to make sure that query mode has been started - // i.e the metadata is updated to reflect the server mode = Query - if metadata.server_mode== Mode::All && PARSEABLE.options.mode == Mode::Ingest { - Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet") - } else { - create_dir_all(PARSEABLE.options.staging_dir())?; - metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?; + // If server is started in ingest mode, ensure query mode has been started + if metadata.server_mode == Mode::All && PARSEABLE.options.mode == Mode::Ingest { + return Err(ObjectStorageError::UnhandledError(format!( + "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}", + JOIN_COMMUNITY + ).into())); + } + create_dir_all(PARSEABLE.options.staging_dir())?; + metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?; // this flag is set to true so that metadata is copied to staging - overwrite_staging = true; - // overwrite remote in all and query mode - // because staging dir has changed. - match PARSEABLE.options.mode { - Mode::All => { - metadata.server_mode.standalone_after_distributed() - .map_err(|err| { - ObjectStorageError::Custom(err.to_string()) - })?; - overwrite_remote = true; - }, - Mode::Query | Mode::Prism => { + overwrite_staging = true; + // overwrite remote in all and query mode + // because staging dir has changed. + match PARSEABLE.options.mode { + Mode::All => { + metadata.server_mode.standalone_after_distributed() + .map_err(|err| ObjectStorageError::Custom(err.to_string()))?; + overwrite_remote = true; + } + Mode::Query | Mode::Prism | Mode::Ingest | Mode::Index => { + update_metadata_mode_and_staging(&mut metadata); + if matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) { overwrite_remote = true; - metadata.server_mode = PARSEABLE.options.mode; - metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); - }, - Mode::Ingest => { - // if ingest server is started fetch the metadata from remote - // update the server mode for local metadata - metadata.server_mode = PARSEABLE.options.mode; - metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); - }, - Mode::Index => { - // if index server is started fetch the metadata from remote - // update the server mode for local metadata - metadata.server_mode = PARSEABLE.options.mode; - metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); } } - Ok(metadata) } + Ok(metadata) } EnvChange::CreateBoth => { create_dir_all(PARSEABLE.options.staging_dir())?; @@ -217,6 +202,11 @@ pub async fn resolve_parseable_metadata( Ok(metadata) } +fn update_metadata_mode_and_staging(metadata: &mut StorageMetadata) { + metadata.server_mode = PARSEABLE.options.mode; + metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); +} + pub fn determine_environment( staging_metadata: Option, remote_metadata: Option, From 51714599b55caf32d367c5c2bca14ad6c68ea836 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 2 Jun 2025 22:01:05 -0400 Subject: [PATCH 3/3] deepsource fix --- src/storage/store_metadata.rs | 170 ++++++++++++++++++---------------- 1 file changed, 91 insertions(+), 79 deletions(-) diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index 9926f64b7..22b21176d 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -110,91 +110,15 @@ pub async fn resolve_parseable_metadata( .as_ref() .map(|meta| serde_json::from_slice(meta).expect("parseable config is valid json")); - // Env Change needs to be updated - let check = determine_environment(staging_metadata, remote_metadata); - // flags for if metadata needs to be synced - let mut overwrite_staging = false; - let mut overwrite_remote = false; + let env_change = determine_environment(staging_metadata, remote_metadata); - let res: Result = match check { - EnvChange::None(mut metadata) => { - // overwrite staging anyways so that it matches remote in case of any divergence - overwrite_staging = true; - match PARSEABLE.options.mode { - Mode::All => { - metadata.server_mode.standalone_after_distributed()?; - overwrite_remote = true; - update_metadata_mode_and_staging(&mut metadata); - } - Mode::Query => { - overwrite_remote = true; - update_metadata_mode_and_staging(&mut metadata); - } - _=> {} - } - if PARSEABLE.options.mode == Mode::All { - metadata.server_mode.standalone_after_distributed()?; - } - Ok(metadata) - }, - EnvChange::NewRemote => { - Err("Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server") - } - EnvChange::NewStaging(mut metadata) => { - // If server is started in ingest mode, ensure query mode has been started - if metadata.server_mode == Mode::All && PARSEABLE.options.mode == Mode::Ingest { - return Err(ObjectStorageError::UnhandledError(format!( - "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}", - JOIN_COMMUNITY - ).into())); - } - create_dir_all(PARSEABLE.options.staging_dir())?; - metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?; - // this flag is set to true so that metadata is copied to staging - overwrite_staging = true; - // overwrite remote in all and query mode - // because staging dir has changed. - match PARSEABLE.options.mode { - Mode::All => { - metadata.server_mode.standalone_after_distributed() - .map_err(|err| ObjectStorageError::Custom(err.to_string()))?; - overwrite_remote = true; - } - Mode::Query | Mode::Prism | Mode::Ingest | Mode::Index => { - update_metadata_mode_and_staging(&mut metadata); - if matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) { - overwrite_remote = true; - } - } - } - Ok(metadata) - } - EnvChange::CreateBoth => { - create_dir_all(PARSEABLE.options.staging_dir())?; - let metadata = StorageMetadata::default(); - // new metadata needs to be set - // if mode is query or all then both staging and remote - match PARSEABLE.options.mode { - Mode::All | Mode::Query | Mode::Prism => overwrite_remote = true, - _ => (), - } - // else only staging - overwrite_staging = true; - Ok(metadata) - } - }; - - let mut metadata = res.map_err(|err| { - let err = format!("{}. {}", err, JOIN_COMMUNITY); - let err: Box = err.into(); - ObjectStorageError::UnhandledError(err) - })?; + let (mut metadata, overwrite_staging, overwrite_remote) = process_env_change(env_change)?; metadata.server_mode = PARSEABLE.options.mode; + if overwrite_remote { put_remote_metadata(&metadata).await?; } - if overwrite_staging { put_staging_metadata(&metadata)?; } @@ -202,6 +126,94 @@ pub async fn resolve_parseable_metadata( Ok(metadata) } +fn process_env_change( + env_change: EnvChange, +) -> Result<(StorageMetadata, bool, bool), ObjectStorageError> { + match env_change { + EnvChange::None(mut metadata) => handle_none_env(&mut metadata), + EnvChange::NewRemote => handle_new_remote_env(), + EnvChange::NewStaging(mut metadata) => handle_new_staging_env(&mut metadata), + EnvChange::CreateBoth => handle_create_both_env(), + } +} + +fn handle_none_env( + metadata: &mut StorageMetadata, +) -> Result<(StorageMetadata, bool, bool), ObjectStorageError> { + let overwrite_staging = true; + let mut overwrite_remote = false; + + match PARSEABLE.options.mode { + Mode::All => { + metadata.server_mode.standalone_after_distributed()?; + overwrite_remote = true; + update_metadata_mode_and_staging(metadata); + } + Mode::Query => { + overwrite_remote = true; + update_metadata_mode_and_staging(metadata); + } + _ => {} + } + if PARSEABLE.options.mode == Mode::All { + metadata.server_mode.standalone_after_distributed()?; + } + Ok((metadata.clone(), overwrite_staging, overwrite_remote)) +} + +fn handle_new_remote_env() -> Result<(StorageMetadata, bool, bool), ObjectStorageError> { + Err(ObjectStorageError::UnhandledError(format!( + "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server. {}", + JOIN_COMMUNITY + ).into())) +} + +fn handle_new_staging_env( + metadata: &mut StorageMetadata, +) -> Result<(StorageMetadata, bool, bool), ObjectStorageError> { + if metadata.server_mode == Mode::All && PARSEABLE.options.mode == Mode::Ingest { + return Err(ObjectStorageError::UnhandledError( + format!( + "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}", + JOIN_COMMUNITY + ) + .into(), + )); + } + create_dir_all(PARSEABLE.options.staging_dir())?; + metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?; + let overwrite_staging = true; + let mut overwrite_remote = false; + + match PARSEABLE.options.mode { + Mode::All => { + metadata + .server_mode + .standalone_after_distributed() + .map_err(|err| ObjectStorageError::Custom(err.to_string()))?; + overwrite_remote = true; + } + Mode::Query | Mode::Prism | Mode::Ingest | Mode::Index => { + update_metadata_mode_and_staging(metadata); + if matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) { + overwrite_remote = true; + } + } + } + Ok((metadata.clone(), overwrite_staging, overwrite_remote)) +} + +fn handle_create_both_env() -> Result<(StorageMetadata, bool, bool), ObjectStorageError> { + create_dir_all(PARSEABLE.options.staging_dir())?; + let metadata = StorageMetadata::default(); + let overwrite_remote = matches!( + PARSEABLE.options.mode, + Mode::All | Mode::Query | Mode::Prism + ); + let overwrite_staging = true; + Ok((metadata, overwrite_staging, overwrite_remote)) +} + fn update_metadata_mode_and_staging(metadata: &mut StorageMetadata) { metadata.server_mode = PARSEABLE.options.mode; metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();