diff --git a/libs/labelbox/src/labelbox/schema/workflow/edges.py b/libs/labelbox/src/labelbox/schema/workflow/edges.py index b47e2d9ab..acab7cce8 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/edges.py +++ b/libs/labelbox/src/labelbox/schema/workflow/edges.py @@ -5,7 +5,6 @@ """ import logging -import uuid from typing import Dict, Any, Optional, TYPE_CHECKING from pydantic import BaseModel, ConfigDict, Field, PrivateAttr @@ -25,11 +24,22 @@ class WorkflowEdge(BaseModel): from a source node to a target node through specific handles. Attributes: - id: Unique identifier for the edge + id: Unique identifier for the edge (format: xy-edge__{source}{sourceHandle}-{target}{targetHandle}) source: ID of the source node target: ID of the target node - sourceHandle: Output handle on the source node (e.g., 'if', 'else') + sourceHandle: Output handle on the source node (e.g., 'if', 'else', 'approved', 'rejected') targetHandle: Input handle on the target node (typically 'in') + + Edge ID Format: + Edge IDs follow the pattern: xy-edge__{source}{sourceHandle}-{target}{targetHandle} + + Example: xy-edge__node1if-node2in + - Prefix: xy-edge__ + - Source node ID: node1 + - Source handle: if + - Separator: - + - Target node ID: node2 + - Target handle: in """ id: str @@ -220,7 +230,13 @@ def _create_edge_instance( Returns: Created WorkflowEdge instance """ - edge_id = f"edge-{uuid.uuid4()}" + # Generate edge ID using the correct format: xy-edge__{source}{sourceHandle}-{target}{targetHandle} + source_handle = output_type.value + target_handle = "in" + edge_id = ( + f"xy-edge__{source.id}{source_handle}-{target.id}{target_handle}" + ) + logger.debug( f"Creating edge {edge_id} from {source.id} to {target.id} with type {output_type.value}" ) @@ -229,8 +245,8 @@ def _create_edge_instance( id=edge_id, source=source.id, target=target.id, - sourceHandle=output_type.value, - targetHandle="in", # Explicitly set targetHandle + sourceHandle=source_handle, + targetHandle=target_handle, # Explicitly set targetHandle ) edge.set_workflow_reference(self.workflow) return edge diff --git a/libs/labelbox/src/labelbox/schema/workflow/workflow.py b/libs/labelbox/src/labelbox/schema/workflow/workflow.py index 306f9a2c6..4af0f9e25 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/workflow.py +++ b/libs/labelbox/src/labelbox/schema/workflow/workflow.py @@ -496,7 +496,10 @@ def reset_to_initial_nodes( ) -> InitialNodes: """Reset workflow and create the two required initial nodes. - Clears all existing nodes and edges, then creates: + IMPORTANT: This method preserves existing initial node IDs to prevent workflow breakage. + It only creates new IDs for truly new workflows (first-time setup). + + Clears all non-initial nodes and edges, then creates/updates: - InitialLabeling node: Entry point for new data requiring labeling - InitialRework node: Entry point for rejected data requiring corrections @@ -526,11 +529,28 @@ def reset_to_initial_nodes( rework_config.model_dump(exclude_none=True) if rework_config else {} ) - # Reset workflow configuration + # Find existing initial nodes to preserve their IDs + existing_labeling_id = None + existing_rework_id = None + + for node_data in self.config.get("nodes", []): + definition_id = node_data.get("definitionId") + if definition_id == WorkflowDefinitionId.InitialLabelingTask.value: + existing_labeling_id = node_data.get("id") + elif definition_id == WorkflowDefinitionId.InitialReworkTask.value: + existing_rework_id = node_data.get("id") + + # Reset workflow configuration (clear all nodes and edges) self.config = {"nodes": [], "edges": []} self._nodes_cache = None self._edges_cache = None + # Create/recreate initial nodes, preserving existing IDs if they exist + if existing_labeling_id: + labeling_dict["id"] = existing_labeling_id + if existing_rework_id: + rework_dict["id"] = existing_rework_id + # Create required initial nodes using internal method initial_labeling = cast( InitialLabelingNode, @@ -554,7 +574,11 @@ def copy_workflow_structure( target_client, target_project_id: str, ) -> "ProjectWorkflow": - """Copy the workflow structure from a source workflow to a new project.""" + """Copy the workflow structure from a source workflow to a new project. + + IMPORTANT: This method preserves existing initial node IDs to prevent workflow breakage. + Changing initial node IDs will completely break the workflow and require support intervention. + """ return WorkflowOperations.copy_workflow_structure( source_workflow, target_client, target_project_id ) @@ -562,7 +586,11 @@ def copy_workflow_structure( def copy_from( self, source_workflow: "ProjectWorkflow", auto_layout: bool = True ) -> "ProjectWorkflow": - """Copy the nodes and edges from a source workflow to this workflow.""" + """Copy the nodes and edges from a source workflow to this workflow. + + IMPORTANT: This method preserves existing initial node IDs to prevent workflow breakage. + Changing initial node IDs will completely break the workflow and require support intervention. + """ return WorkflowOperations.copy_from(self, source_workflow, auto_layout) # Layout and display methods diff --git a/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py b/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py index 8129c31b7..d2c93edbc 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py +++ b/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py @@ -390,7 +390,11 @@ def copy_workflow_structure( target_client, target_project_id: str, ) -> "ProjectWorkflow": - """Copy the workflow structure from a source workflow to a new project.""" + """Copy the workflow structure from a source workflow to a new project. + + IMPORTANT: This method preserves existing initial node IDs in the target workflow + to prevent workflow breakage. Only non-initial nodes get new IDs. + """ try: # Create a new workflow in the target project from labelbox.schema.workflow.workflow import ProjectWorkflow @@ -399,38 +403,74 @@ def copy_workflow_structure( target_client, target_project_id ) + # Find existing initial nodes in target workflow to preserve their IDs + existing_initial_ids = {} + for node_data in target_workflow.config.get("nodes", []): + definition_id = node_data.get("definitionId") + if ( + definition_id + == WorkflowDefinitionId.InitialLabelingTask.value + ): + existing_initial_ids[ + WorkflowDefinitionId.InitialLabelingTask.value + ] = node_data.get("id") + elif ( + definition_id + == WorkflowDefinitionId.InitialReworkTask.value + ): + existing_initial_ids[ + WorkflowDefinitionId.InitialReworkTask.value + ] = node_data.get("id") + # Get the source config new_config = source_workflow.config.copy() old_to_new_id_map = {} - # Generate new IDs for all nodes + # Generate new IDs for all nodes, but preserve existing initial node IDs if new_config.get("nodes"): - new_config["nodes"] = [ - { - **node, - "id": str(uuid.uuid4()), - } - for node in new_config["nodes"] - ] - # Create mapping of old to new IDs - old_to_new_id_map = { - old_node["id"]: new_node["id"] - for old_node, new_node in zip( - source_workflow.config["nodes"], new_config["nodes"] + updated_nodes = [] + for node in new_config["nodes"]: + definition_id = node.get("definitionId") + old_id = node["id"] + + # Preserve existing initial node IDs, generate new IDs for others + if definition_id in existing_initial_ids: + new_id = existing_initial_ids[definition_id] + else: + new_id = str(uuid.uuid4()) + + old_to_new_id_map[old_id] = new_id + updated_nodes.append( + { + **node, + "id": new_id, + } ) - } + + new_config["nodes"] = updated_nodes # Update edges to use the new node IDs if new_config.get("edges"): - new_config["edges"] = [ - { - **edge, - "id": str(uuid.uuid4()), - "source": old_to_new_id_map[edge["source"]], - "target": old_to_new_id_map[edge["target"]], - } - for edge in new_config["edges"] - ] + updated_edges = [] + for edge in new_config["edges"]: + source_id = old_to_new_id_map[edge["source"]] + target_id = old_to_new_id_map[edge["target"]] + source_handle = edge.get("sourceHandle", "if") + target_handle = edge.get("targetHandle", "in") + + # Generate edge ID using correct format: xy-edge__{source}{sourceHandle}-{target}{targetHandle} + edge_id = f"xy-edge__{source_id}{source_handle}-{target_id}{target_handle}" + + updated_edges.append( + { + **edge, + "id": edge_id, + "source": source_id, + "target": target_id, + } + ) + + new_config["edges"] = updated_edges # Update the target workflow with the new config target_workflow.config = new_config @@ -450,8 +490,31 @@ def copy_from( source_workflow: "ProjectWorkflow", auto_layout: bool = True, ) -> "ProjectWorkflow": - """Copy the nodes and edges from a source workflow to this workflow.""" + """Copy the nodes and edges from a source workflow to this workflow. + + IMPORTANT: This method preserves existing initial node IDs in the target workflow + to prevent workflow breakage. Only non-initial nodes get new IDs. + """ try: + # Find existing initial nodes in target workflow to preserve their IDs + existing_initial_ids = {} + for node_data in workflow.config.get("nodes", []): + definition_id = node_data.get("definitionId") + if ( + definition_id + == WorkflowDefinitionId.InitialLabelingTask.value + ): + existing_initial_ids[ + WorkflowDefinitionId.InitialLabelingTask.value + ] = node_data.get("id") + elif ( + definition_id + == WorkflowDefinitionId.InitialReworkTask.value + ): + existing_initial_ids[ + WorkflowDefinitionId.InitialReworkTask.value + ] = node_data.get("id") + # Create a clean work config (without connections) work_config: Dict[str, List[Any]] = {"nodes": [], "edges": []} @@ -463,9 +526,15 @@ def copy_from( # First pass: Create all nodes by directly copying configuration for source_node_data in source_workflow.config.get("nodes", []): - # Generate a new ID for the node - new_id = f"node-{uuid.uuid4()}" + definition_id = source_node_data.get("definitionId") old_id = source_node_data.get("id") + + # Preserve existing initial node IDs, generate new IDs for others + if definition_id in existing_initial_ids: + new_id = existing_initial_ids[definition_id] + else: + new_id = f"node-{uuid.uuid4()}" + id_mapping[old_id] = new_id # Create a new node data dictionary by copying the source node @@ -498,12 +567,18 @@ def copy_from( continue # Create new edge + source_handle = source_edge_data.get("sourceHandle", "out") + target_handle = source_edge_data.get("targetHandle", "in") + + # Generate edge ID using correct format: xy-edge__{source}{sourceHandle}-{target}{targetHandle} + edge_id = f"xy-edge__{id_mapping[source_id]}{source_handle}-{id_mapping[target_id]}{target_handle}" + new_edge = { - "id": f"edge-{uuid.uuid4()}", + "id": edge_id, "source": id_mapping[source_id], "target": id_mapping[target_id], - "sourceHandle": source_edge_data.get("sourceHandle", "out"), - "targetHandle": source_edge_data.get("targetHandle", "in"), + "sourceHandle": source_handle, + "targetHandle": target_handle, } # Add the edge to config diff --git a/libs/labelbox/tests/integration/test_workflow.py b/libs/labelbox/tests/integration/test_workflow.py index b85494460..96cb53b46 100644 --- a/libs/labelbox/tests/integration/test_workflow.py +++ b/libs/labelbox/tests/integration/test_workflow.py @@ -35,6 +35,7 @@ consensus_average, review_time, labeled_at, + ReworkConfig, ) from labelbox.schema.media_type import MediaType @@ -769,3 +770,277 @@ def test_model_prediction_conditions(client, test_projects): assert ( filters[0]["field"] == "ModelPrediction" ), "Should have ModelPrediction filter" + + +def test_reset_to_initial_nodes_preserves_existing_ids(client): + """Test that reset_to_initial_nodes preserves existing initial node IDs.""" + # Create a new project for this test + project_name = f"ID Preservation Test {uuid.uuid4()}" + project = client.create_project( + name=project_name, media_type=MediaType.Image + ) + + try: + # Get workflow and create initial nodes + workflow = project.get_workflow() + initial_nodes = workflow.reset_to_initial_nodes() + + # Create a complete workflow by adding nodes and edges + done_node = workflow.add_node(type=NodeType.Done, name="Test Done") + workflow.add_edge(initial_nodes.labeling, done_node) + workflow.add_edge(initial_nodes.rework, done_node) + + # Record the original IDs + original_labeling_id = initial_nodes.labeling.id + original_rework_id = initial_nodes.rework.id + + # Update the workflow to save the initial state (now valid) + workflow.update_config() + + # Reset again with new configuration + new_initial_nodes = workflow.reset_to_initial_nodes( + labeling_config=LabelingConfig( + instructions="Updated instructions", + max_contributions_per_user=5, + ), + rework_config=ReworkConfig( + instructions="Updated rework instructions", + max_contributions_per_user=3, + ), + ) + + # Rebuild the workflow structure after reset + done_node = workflow.add_node(type=NodeType.Done, name="Test Done") + workflow.add_edge(new_initial_nodes.labeling, done_node) + workflow.add_edge(new_initial_nodes.rework, done_node) + + # Verify that the IDs are preserved + assert new_initial_nodes.labeling.id == original_labeling_id, ( + f"InitialLabelingNode ID changed from {original_labeling_id} to {new_initial_nodes.labeling.id}. " + f"This will break the workflow!" + ) + assert new_initial_nodes.rework.id == original_rework_id, ( + f"InitialReworkNode ID changed from {original_rework_id} to {new_initial_nodes.rework.id}. " + f"This will break the workflow!" + ) + + # Verify that the configuration was updated + assert new_initial_nodes.labeling.instructions == "Updated instructions" + assert new_initial_nodes.labeling.max_contributions_per_user == 5 + assert ( + new_initial_nodes.rework.instructions + == "Updated rework instructions" + ) + assert new_initial_nodes.rework.max_contributions_per_user == 3 + + # Save and verify the workflow still works + workflow.update_config() + + # Reload the workflow and verify IDs are still preserved + reloaded_workflow = project.get_workflow() + reloaded_nodes = reloaded_workflow.get_nodes() + + labeling_node = next( + n + for n in reloaded_nodes + if n.definition_id == WorkflowDefinitionId.InitialLabelingTask + ) + rework_node = next( + n + for n in reloaded_nodes + if n.definition_id == WorkflowDefinitionId.InitialReworkTask + ) + + assert labeling_node.id == original_labeling_id + assert rework_node.id == original_rework_id + + finally: + project.delete() + + +def test_copy_workflow_preserves_initial_node_ids(client): + """Test that copy operations preserve existing initial node IDs.""" + # Create source and target projects + source_project = client.create_project( + name=f"Source Project {uuid.uuid4()}", media_type=MediaType.Image + ) + target_project = client.create_project( + name=f"Target Project {uuid.uuid4()}", media_type=MediaType.Image + ) + + try: + # Set up source workflow + source_workflow = source_project.get_workflow() + source_initial = source_workflow.reset_to_initial_nodes() + done_node = source_workflow.add_node(type=NodeType.Done) + source_workflow.add_edge(source_initial.labeling, done_node) + source_workflow.add_edge(source_initial.rework, done_node) + source_workflow.update_config() + + # Set up target workflow with its own initial nodes + target_workflow = target_project.get_workflow() + target_initial = target_workflow.reset_to_initial_nodes() + + # Record original target initial node IDs + original_target_labeling_id = target_initial.labeling.id + original_target_rework_id = target_initial.rework.id + + # Copy from source to target + target_workflow.copy_from(source_workflow) + + # Verify that target's initial node IDs are preserved + updated_nodes = target_workflow.get_nodes() + + labeling_node = next( + n + for n in updated_nodes + if n.definition_id == WorkflowDefinitionId.InitialLabelingTask + ) + rework_node = next( + n + for n in updated_nodes + if n.definition_id == WorkflowDefinitionId.InitialReworkTask + ) + + assert labeling_node.id == original_target_labeling_id, ( + f"Target InitialLabelingNode ID changed from {original_target_labeling_id} to {labeling_node.id}. " + f"This will break the workflow!" + ) + assert rework_node.id == original_target_rework_id, ( + f"Target InitialReworkNode ID changed from {original_target_rework_id} to {rework_node.id}. " + f"This will break the workflow!" + ) + + # Verify the structure was copied (should have a Done node) + done_nodes = [ + n + for n in updated_nodes + if n.definition_id == WorkflowDefinitionId.Done + ] + assert len(done_nodes) == 1, "Done node should have been copied" + + finally: + source_project.delete() + target_project.delete() + + +def test_edge_id_format_is_correct(client): + """Test that edge IDs are generated using the correct format.""" + # Create a new project for this test + project_name = f"Edge ID Format Test {uuid.uuid4()}" + project = client.create_project( + name=project_name, media_type=MediaType.Image + ) + + try: + # Get workflow and create initial nodes + workflow = project.get_workflow() + initial_nodes = workflow.reset_to_initial_nodes() + + # Create a done node + done_node = workflow.add_node(type=NodeType.Done, name="Test Done") + + # Create edges with different handle types + edge1 = workflow.add_edge( + initial_nodes.labeling, done_node, NodeOutput.If + ) + edge2 = workflow.add_edge( + initial_nodes.rework, done_node, NodeOutput.If + ) + + # Verify edge ID format: xy-edge__{source}{sourceHandle}-{target}{targetHandle} + expected_edge1_id = ( + f"xy-edge__{initial_nodes.labeling.id}if-{done_node.id}in" + ) + expected_edge2_id = ( + f"xy-edge__{initial_nodes.rework.id}if-{done_node.id}in" + ) + + assert ( + edge1.id == expected_edge1_id + ), f"Edge ID format incorrect. Expected: {expected_edge1_id}, Got: {edge1.id}" + assert ( + edge2.id == expected_edge2_id + ), f"Edge ID format incorrect. Expected: {expected_edge2_id}, Got: {edge2.id}" + + # Verify edge properties are correct + assert edge1.source == initial_nodes.labeling.id + assert edge1.target == done_node.id + assert edge1.sourceHandle == "if" + assert edge1.targetHandle == "in" + + assert edge2.source == initial_nodes.rework.id + assert edge2.target == done_node.id + assert edge2.sourceHandle == "if" + assert edge2.targetHandle == "in" + + # Save and verify the workflow + workflow.update_config() + + # Reload workflow and check edge IDs are preserved + reloaded_workflow = project.get_workflow() + reloaded_edges = reloaded_workflow.get_edges() + + edge_ids = [edge.id for edge in reloaded_edges] + assert ( + expected_edge1_id in edge_ids + ), f"Edge ID {expected_edge1_id} not found after reload" + assert ( + expected_edge2_id in edge_ids + ), f"Edge ID {expected_edge2_id} not found after reload" + + finally: + project.delete() + + +def test_edge_id_format_with_different_handles(client): + """Test edge ID format with different source handle types.""" + # Create a new project for this test + project_name = f"Edge Handle Test {uuid.uuid4()}" + project = client.create_project( + name=project_name, media_type=MediaType.Image + ) + + try: + # Create workflow with review node + workflow = project.get_workflow() + initial_nodes = workflow.reset_to_initial_nodes() + + review_node = workflow.add_node( + type=NodeType.Review, name="Test Review" + ) + done_node = workflow.add_node(type=NodeType.Done, name="Approved") + rework_node = workflow.add_node(type=NodeType.Rework, name="Rejected") + + # Connect initial to review + workflow.add_edge(initial_nodes.labeling, review_node) + + # Create edges with different handle types + approved_edge = workflow.add_edge( + review_node, done_node, NodeOutput.Approved + ) + rejected_edge = workflow.add_edge( + review_node, rework_node, NodeOutput.Rejected + ) + + # Verify edge ID formats - NodeOutput.Approved maps to "if", NodeOutput.Rejected maps to "else" + expected_approved_id = f"xy-edge__{review_node.id}if-{done_node.id}in" + expected_rejected_id = ( + f"xy-edge__{review_node.id}else-{rework_node.id}in" + ) + + assert ( + approved_edge.id == expected_approved_id + ), f"Approved edge ID format incorrect. Expected: {expected_approved_id}, Got: {approved_edge.id}" + assert ( + rejected_edge.id == expected_rejected_id + ), f"Rejected edge ID format incorrect. Expected: {expected_rejected_id}, Got: {rejected_edge.id}" + + # Verify handle values - NodeOutput.Approved maps to "if", NodeOutput.Rejected maps to "else" + assert approved_edge.sourceHandle == "if" + assert rejected_edge.sourceHandle == "else" + assert approved_edge.targetHandle == "in" + assert rejected_edge.targetHandle == "in" + + finally: + project.delete()