Skip to content

Curator

synapseclient.extensions.curator

Synapse Curator Extensions

This module provides library functions for metadata curation tasks in Synapse.

Functions

create_file_based_metadata_task

create_file_based_metadata_task(folder_id: str, curation_task_name: str, instructions: str, attach_wiki: bool = False, entity_view_name: str = 'JSON Schema view', schema_uri: Optional[str] = None, enable_derived_annotations: bool = False, *, synapse_client: Optional[Synapse] = None) -> Tuple[str, str]

Create a file view for a schema-bound folder using schematic.

Creating a file-based metadata curation task with schema binding

In this example, we create an EntityView and CurationTask for file-based metadata curation. If a schema_uri is provided, it will be bound to the folder.

import synapseclient
from synapseclient.extensions.curator import create_file_based_metadata_task

syn = synapseclient.Synapse()
syn.login()

entity_view_id, task_id = create_file_based_metadata_task(
    synapse_client=syn,
    folder_id="syn12345678",
    curation_task_name="BiospecimenMetadataTemplate",
    instructions="Please curate this metadata according to the schema requirements",
    attach_wiki=False,
    entity_view_name="Biospecimen Metadata View",
    schema_uri="sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"
)
PARAMETER DESCRIPTION
folder_id

The Synapse Folder ID to create the file view for.

TYPE: str

curation_task_name

Name for the CurationTask (used as data_type field). Must be unique within the project, otherwise if it matches an existing CurationTask, that task will be updated with new data.

TYPE: str

instructions

Instructions for the curation task.

TYPE: str

attach_wiki

Whether or not to attach a Synapse Wiki (default: False).

TYPE: bool DEFAULT: False

entity_view_name

Name for the created entity view (default: "JSON Schema view").

TYPE: str DEFAULT: 'JSON Schema view'

schema_uri

Optional JSON schema URI to bind to the folder. If provided, the schema will be bound to the folder before creating the entity view. (e.g., 'sage.schemas.v2571-amp.Biospecimen.schema-0.0.1')

TYPE: Optional[str] DEFAULT: None

enable_derived_annotations

If true, enable derived annotations. Defaults to False.

TYPE: bool DEFAULT: False

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
Tuple[str, str]

A tuple containing: - The Synapse ID of the entity view created - The task ID of the curation task created

RAISES DESCRIPTION
ValueError

If required parameters are missing.

SynapseError

If there are issues with Synapse operations.

Source code in synapseclient/extensions/curator/file_based_metadata_task.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def create_file_based_metadata_task(
    folder_id: str,
    curation_task_name: str,
    instructions: str,
    attach_wiki: bool = False,
    entity_view_name: str = "JSON Schema view",
    schema_uri: Optional[str] = None,
    enable_derived_annotations: bool = False,
    *,
    synapse_client: Optional[Synapse] = None,
) -> Tuple[str, str]:
    """
    Create a file view for a schema-bound folder using schematic.

    Example: Creating a file-based metadata curation task with schema binding
        In this example, we create an EntityView and CurationTask for file-based
        metadata curation. If a schema_uri is provided, it will be bound to the folder.

        ```python
        import synapseclient
        from synapseclient.extensions.curator import create_file_based_metadata_task

        syn = synapseclient.Synapse()
        syn.login()

        entity_view_id, task_id = create_file_based_metadata_task(
            synapse_client=syn,
            folder_id="syn12345678",
            curation_task_name="BiospecimenMetadataTemplate",
            instructions="Please curate this metadata according to the schema requirements",
            attach_wiki=False,
            entity_view_name="Biospecimen Metadata View",
            schema_uri="sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"
        )
        ```

    Arguments:
        folder_id: The Synapse Folder ID to create the file view for.
        curation_task_name: Name for the CurationTask (used as data_type field).
            Must be unique within the project, otherwise if it matches an existing
            CurationTask, that task will be updated with new data.
        instructions: Instructions for the curation task.
        attach_wiki: Whether or not to attach a Synapse Wiki (default: False).
        entity_view_name: Name for the created entity view (default: "JSON Schema view").
        schema_uri: Optional JSON schema URI to bind to the folder. If provided,
            the schema will be bound to the folder before creating the entity view.
            (e.g., 'sage.schemas.v2571-amp.Biospecimen.schema-0.0.1')
        enable_derived_annotations: If true, enable derived annotations. Defaults to False.
        synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

    Returns:
        A tuple containing:
          - The Synapse ID of the entity view created
          - The task ID of the curation task created

    Raises:
        ValueError: If required parameters are missing.
        SynapseError: If there are issues with Synapse operations.
    """
    # Validate required parameters
    if not folder_id:
        raise ValueError("folder_id is required")
    if not curation_task_name:
        raise ValueError("curation_task_name is required")
    if not instructions:
        raise ValueError("instructions is required")

    synapse_client = Synapse.get_client(synapse_client=synapse_client)

    # Bind schema to folder if schema_uri is provided
    if schema_uri:
        synapse_client.logger.info(
            f"Attempting to bind schema {schema_uri} to folder {folder_id}."
        )
        try:
            folder = Folder(folder_id).get(synapse_client=synapse_client)
            folder.bind_schema(
                json_schema_uri=schema_uri,
                enable_derived_annotations=enable_derived_annotations,
                synapse_client=synapse_client,
            )
            synapse_client.logger.info(
                f"Successfully bound schema {schema_uri} to folder {folder_id}."
            )
        except Exception as e:
            synapse_client.logger.exception(
                f"Error binding schema {schema_uri} to folder {folder_id}"
            )
            raise e

    synapse_client.logger.info("Attempting to create entity view.")
    try:
        entity_view_id = create_json_schema_entity_view(
            syn=synapse_client,
            synapse_entity_id=folder_id,
            entity_view_name=entity_view_name,
        )
    except Exception as e:
        synapse_client.logger.exception("Error creating entity view")
        raise e
    synapse_client.logger.info("Created entity view.")

    if attach_wiki:
        synapse_client.logger.info("Attempting to attach wiki.")
        try:
            create_or_update_wiki_with_entity_view(
                syn=synapse_client, entity_view_id=entity_view_id, owner_id=folder_id
            )
        except Exception as e:
            synapse_client.logger.exception("Error creating wiki")
            raise e
        synapse_client.logger.info("Wiki attached.")

    # Validate that the folder has an attached JSON schema
    # The curation_task_name parameter is now required and used directly for the CurationTask.

    synapse_client.logger.info("Attempting to get the attached schema.")
    try:
        entity = get(folder_id, synapse_client=synapse_client)
        entity.get_schema(synapse_client=synapse_client)
    except Exception as e:
        synapse_client.logger.exception("Error getting the attached schema.")
        raise e
    synapse_client.logger.info("Schema retrieval successful")

    # Use the provided curation_task_name (required parameter)
    task_datatype = curation_task_name

    synapse_client.logger.info(
        "Attempting to get the Synapse ID of the provided folders project."
    )
    try:
        entity = Folder(folder_id).get(synapse_client=synapse_client)
        parent = synapse_client.get(entity.parent_id)
        project = None
        while not project:
            if parent.concreteType == "org.sagebionetworks.repo.model.Project":
                project = parent
                break
            parent = synapse_client.get(parent.parentId)
    except Exception as e:
        synapse_client.logger.exception(
            "Error getting the Synapse ID of the provided folders project"
        )
        raise e
    synapse_client.logger.info("Got the Synapse ID of the provided folders project.")

    synapse_client.logger.info("Attempting to create the CurationTask.")
    try:
        task = CurationTask(
            data_type=task_datatype,
            project_id=project.id,
            instructions=instructions,
            task_properties=FileBasedMetadataTaskProperties(
                upload_folder_id=folder_id,
                file_view_id=entity_view_id,
            ),
        ).store(synapse_client=synapse_client)
    except Exception as e:
        synapse_client.logger.exception("Error creating the CurationTask.")
        raise e
    synapse_client.logger.info("Created the CurationTask.")

    return (entity_view_id, task.task_id)

create_record_based_metadata_task

create_record_based_metadata_task(project_id: str, folder_id: str, record_set_name: str, record_set_description: str, curation_task_name: str, upsert_keys: List[str], instructions: str, schema_uri: str, bind_schema_to_record_set: bool = True, enable_derived_annotations: bool = False, *, synapse_client: Optional[Synapse] = None) -> Tuple[RecordSet, CurationTask, Grid]

Generate and upload CSV templates as a RecordSet for record-based metadata, create a CurationTask, and also create a Grid to bootstrap the ValidationStatistics.

A number of schema URIs that are already registered to Synapse can be found at:

If you have yet to create and register your JSON schema in Synapse, please refer to the tutorial at https://python-docs.synapse.org/en/stable/tutorials/python/json_schema/.

Creating a record-based metadata curation task with a schema URI

In this example, we create a RecordSet and CurationTask for biospecimen metadata curation using a schema URI. By default this will also bind the schema to the RecordSet, however the bind_schema_to_record_set parameter can be set to False to skip that step.

import synapseclient
from synapseclient.extensions.curator import create_record_based_metadata_task

syn = synapseclient.Synapse()
syn.login()

record_set, task, grid = create_record_based_metadata_task(
    synapse_client=syn,
    project_id="syn12345678",
    folder_id="syn87654321",
    record_set_name="BiospecimenMetadata_RecordSet",
    record_set_description="RecordSet for biospecimen metadata curation",
    curation_task_name="BiospecimenMetadataTemplate",
    upsert_keys=["specimenID"],
    instructions="Please curate this metadata according to the schema requirements",
    schema_uri="schema-org-schema.name.schema-v1.0.0"
)
PARAMETER DESCRIPTION
project_id

The Synapse ID of the project where the folder exists.

TYPE: str

folder_id

The Synapse ID of the folder to upload RecordSet to.

TYPE: str

record_set_name

Name for the RecordSet.

TYPE: str

record_set_description

Description for the RecordSet.

TYPE: str

curation_task_name

Name for the CurationTask (used as data_type field). Must be unique within the project, otherwise if it matches an existing CurationTask, that task will be updated with new data.

TYPE: str

upsert_keys

List of column names to use as upsert keys.

TYPE: List[str]

instructions

Instructions for the curation task.

TYPE: str

schema_uri

JSON schema URI for the RecordSet schema. (e.g., 'sage.schemas.v2571-amp.Biospecimen.schema-0.0.1', 'sage.schemas.v2571-ad.Analysis.schema-0.0.0')

TYPE: str

bind_schema_to_record_set

Whether to bind the given schema to the RecordSet (default: True).

TYPE: bool DEFAULT: True

enable_derived_annotations

If true, enable derived annotations. Defaults to False.

TYPE: bool DEFAULT: False

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
Tuple[RecordSet, CurationTask, Grid]

Tuple containing the created RecordSet, CurationTask, and Grid objects

RAISES DESCRIPTION
ValueError

If required parameters are missing or if schema_uri is not provided.

SynapseError

If there are issues with Synapse operations.

Source code in synapseclient/extensions/curator/record_based_metadata_task.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def create_record_based_metadata_task(
    project_id: str,
    folder_id: str,
    record_set_name: str,
    record_set_description: str,
    curation_task_name: str,
    upsert_keys: List[str],
    instructions: str,
    schema_uri: str,
    bind_schema_to_record_set: bool = True,
    enable_derived_annotations: bool = False,
    *,
    synapse_client: Optional[Synapse] = None,
) -> Tuple[RecordSet, CurationTask, Grid]:
    """
    Generate and upload CSV templates as a RecordSet for record-based metadata,
    create a CurationTask, and also create a Grid to bootstrap the ValidationStatistics.

    A number of schema URIs that are already registered to Synapse can be found at:

    - <https://www.synapse.org/Synapse:syn69735275/tables/>


    If you have yet to create and register your JSON schema in Synapse, please refer to
    the tutorial at <https://python-docs.synapse.org/en/stable/tutorials/python/json_schema/>.


    Example: Creating a record-based metadata curation task with a schema URI
        In this example, we create a RecordSet and CurationTask for biospecimen metadata
        curation using a schema URI. By default this will also bind the schema to the
        RecordSet, however the `bind_schema_to_record_set` parameter can be set to
        False to skip that step.


        ```python
        import synapseclient
        from synapseclient.extensions.curator import create_record_based_metadata_task

        syn = synapseclient.Synapse()
        syn.login()

        record_set, task, grid = create_record_based_metadata_task(
            synapse_client=syn,
            project_id="syn12345678",
            folder_id="syn87654321",
            record_set_name="BiospecimenMetadata_RecordSet",
            record_set_description="RecordSet for biospecimen metadata curation",
            curation_task_name="BiospecimenMetadataTemplate",
            upsert_keys=["specimenID"],
            instructions="Please curate this metadata according to the schema requirements",
            schema_uri="schema-org-schema.name.schema-v1.0.0"
        )
        ```

    Arguments:
        project_id: The Synapse ID of the project where the folder exists.
        folder_id: The Synapse ID of the folder to upload RecordSet to.
        record_set_name: Name for the RecordSet.
        record_set_description: Description for the RecordSet.
        curation_task_name: Name for the CurationTask (used as data_type field).
            Must be unique within the project, otherwise if it matches an existing
            CurationTask, that task will be updated with new data.
        upsert_keys: List of column names to use as upsert keys.
        instructions: Instructions for the curation task.
        schema_uri: JSON schema URI for the RecordSet schema.
            (e.g., 'sage.schemas.v2571-amp.Biospecimen.schema-0.0.1', 'sage.schemas.v2571-ad.Analysis.schema-0.0.0')
        bind_schema_to_record_set: Whether to bind the given schema to the RecordSet
            (default: True).
        enable_derived_annotations: If true, enable derived annotations. Defaults to False.
        synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

    Returns:
        Tuple containing the created RecordSet, CurationTask, and Grid objects

    Raises:
        ValueError: If required parameters are missing or if schema_uri is not provided.
        SynapseError: If there are issues with Synapse operations.
    """
    # Validate required parameters
    if not project_id:
        raise ValueError("project_id is required")
    if not folder_id:
        raise ValueError("folder_id is required")
    if not record_set_name:
        raise ValueError("record_set_name is required")
    if not record_set_description:
        raise ValueError("record_set_description is required")
    if not curation_task_name:
        raise ValueError("curation_task_name is required")
    if not upsert_keys:
        raise ValueError("upsert_keys is required and must be a non-empty list")
    if not instructions:
        raise ValueError("instructions is required")
    if not schema_uri:
        raise ValueError("schema_uri is required")

    synapse_client = Synapse.get_client(synapse_client=synapse_client)

    template_df = extract_schema_properties_from_web(
        syn=synapse_client, schema_uri=schema_uri
    )
    synapse_client.logger.info(
        f"Extracted schema properties and created template: {template_df.columns.tolist()}"
    )

    tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
    try:
        with open(tmp.name, "w", encoding="utf-8") as f:
            template_df.to_csv(f, index=False)
    except Exception as e:
        synapse_client.logger.exception("Error writing template to temporary CSV file")
        raise e

    try:
        record_set_with_data = RecordSet(
            name=record_set_name,
            parent_id=folder_id,
            description=record_set_description,
            path=tmp.name,
            upsert_keys=upsert_keys,
        ).store(synapse_client=synapse_client)
        record_set_id = record_set_with_data.id
        synapse_client.logger.info(
            f"Created RecordSet with ID: {record_set_id} in folder {folder_id}"
        )

        if bind_schema_to_record_set:
            record_set_with_data.bind_schema(
                json_schema_uri=schema_uri,
                enable_derived_annotations=enable_derived_annotations,
                synapse_client=synapse_client,
            )
            synapse_client.logger.info(
                f"Bound schema {schema_uri} to RecordSet ID: {record_set_id}"
            )
    except Exception as e:
        synapse_client.logger.exception("Error creating RecordSet in Synapse")
        raise e

    try:
        curation_task = CurationTask(
            data_type=curation_task_name,
            project_id=project_id,
            instructions=instructions,
            task_properties=RecordBasedMetadataTaskProperties(
                record_set_id=record_set_id,
            ),
        ).store(synapse_client=synapse_client)
        synapse_client.logger.info(
            f"Created CurationTask ({curation_task.task_id}) with name {curation_task_name}"
        )
    except Exception as e:
        synapse_client.logger.error(f"Error creating CurationTask in Synapse: {e}")
        raise e

    try:
        curation_grid: Grid = Grid(
            record_set_id=record_set_id,
        )
        curation_grid.create(synapse_client=synapse_client)
        curation_grid = curation_grid.export_to_record_set(
            synapse_client=synapse_client
        )
        synapse_client.logger.info(
            f"Created Grid view for RecordSet ID: {record_set_id} for curation task {curation_task_name}"
        )
    except Exception as e:
        synapse_client.logger.exception("Error creating Grid view in Synapse")
        raise e

    return record_set_with_data, curation_task, curation_grid

generate_jsonld

generate_jsonld(schema: Any, data_model_labels: DisplayLabelType, output_jsonld: Optional[str], *, synapse_client: Optional[Synapse] = None) -> dict

Convert a CSV data model specification to JSON-LD format with validation and error checking.

This function parses your CSV data model (containing attributes, validation rules, dependencies, and valid values), converts it to a graph-based JSON-LD representation, validates the structure for common errors, and saves the result. The generated JSON-LD file serves as input for generate_jsonschema() and other data model operations.

Data Model Requirements:

Your CSV should include columns defining:

  • Attribute names: Property/attribute identifiers
  • Display names: Human-readable labels (optional but recommended)
  • Descriptions: Documentation for each attribute
  • Valid values: Allowed enum values for attributes (comma-separated)
  • Validation rules: Rules like list, regex, inRange, required, etc.
  • Dependencies: Relationships between attributes using dependsOn
  • Required status: Whether attributes are mandatory

Validation Checks Performed:

  • Ensures all required fields (like displayName) are present
  • Detects cycles in attribute dependencies (which would create invalid schemas)
  • Checks for blacklisted characters in display names that Synapse doesn't allow
  • Validates that attribute names don't conflict with reserved system names
  • Verifies the graph structure is a valid directed acyclic graph (DAG)
PARAMETER DESCRIPTION
schema

Path or URL to your data model CSV file. Can be a local file path or a URL (e.g., from GitHub). This file should contain your complete data model specification with all attributes, validation rules, and relationships.

TYPE: Any

data_model_labels

Label format for the JSON-LD output:

  • "class_label" (default, recommended): Uses standard attribute names as labels
  • "display_label": Uses display names as labels if they contain no blacklisted characters (parentheses, periods, spaces, hyphens), otherwise falls back to class labels. Use cautiously as this can affect downstream compatibility.

TYPE: DisplayLabelType

output_jsonld

Path where the JSON-LD file will be saved. If None, saves alongside the input CSV with a .jsonld extension (e.g., model.csvmodel.jsonld).

TYPE: Optional[str]

synapse_client

Optional Synapse client instance for logging. If None, creates a new client instance. Use Synapse.get_client() or pass an authenticated client.

TYPE: Optional[Synapse] DEFAULT: None

Output:

The function logs validation errors and warnings to help you fix data model issues before generating JSON schemas. Errors indicate critical problems that must be fixed, while warnings suggest improvements but won't block schema generation.

RETURNS DESCRIPTION
dict

The generated data model as a dictionary in JSON-LD format. The same data is also saved to the file path specified in output_jsonld.

Using this function to generate JSONLD Schema files:

Basic usage with default output path:

from synapseclient import Synapse
from synapseclient.extensions.curator import generate_jsonld

syn = Synapse()
syn.login()

jsonld_model = generate_jsonld(
    schema="path/to/my_data_model.csv",
    data_model_labels="class_label",
    output_jsonld=None,  # Saves to my_data_model.jsonld
    synapse_client=syn
)

Specify custom output path:

jsonld_model = generate_jsonld(
    schema="models/patient_model.csv",
    data_model_labels="class_label",
    output_jsonld="~/output/patient_model_v1.jsonld",
    synapse_client=syn
)

Use display labels:

jsonld_model = generate_jsonld(
    schema="my_model.csv",
    data_model_labels="display_label",
    output_jsonld="my_model.jsonld",
    synapse_client=syn
)

Load from URL:

jsonld_model = generate_jsonld(
    schema="https://raw.githubusercontent.com/org/repo/main/model.csv",
    data_model_labels="class_label",
    output_jsonld="downloaded_model.jsonld",
    synapse_client=syn
)

Source code in synapseclient/extensions/curator/schema_generation.py
5738
5739
5740
5741
5742
5743
5744
5745
5746
5747
5748
5749
5750
5751
5752
5753
5754
5755
5756
5757
5758
5759
5760
5761
5762
5763
5764
5765
5766
5767
5768
5769
5770
5771
5772
5773
5774
5775
5776
5777
5778
5779
5780
5781
5782
5783
5784
5785
5786
5787
5788
5789
5790
5791
5792
5793
5794
5795
5796
5797
5798
5799
5800
5801
5802
5803
5804
5805
5806
5807
5808
5809
5810
5811
5812
5813
5814
5815
5816
5817
5818
5819
5820
5821
5822
5823
5824
5825
5826
5827
5828
5829
5830
5831
5832
5833
5834
5835
5836
5837
5838
5839
5840
5841
5842
5843
5844
5845
5846
5847
5848
5849
5850
5851
5852
5853
5854
5855
5856
5857
5858
5859
5860
5861
5862
5863
5864
5865
5866
5867
5868
5869
5870
5871
5872
5873
5874
5875
5876
5877
5878
5879
5880
5881
5882
5883
5884
5885
5886
5887
5888
5889
5890
5891
5892
5893
5894
5895
5896
5897
5898
5899
5900
5901
5902
5903
5904
5905
5906
5907
5908
5909
5910
5911
5912
5913
5914
5915
5916
5917
5918
5919
5920
5921
5922
5923
5924
5925
5926
5927
5928
5929
def generate_jsonld(
    schema: Any,
    data_model_labels: DisplayLabelType,
    output_jsonld: Optional[str],
    *,
    synapse_client: Optional[Synapse] = None,
) -> dict:
    """
    Convert a CSV data model specification to JSON-LD format with validation and error checking.

    This function parses your CSV data model (containing attributes, validation rules,
    dependencies, and valid values), converts it to a graph-based JSON-LD representation,
    validates the structure for common errors, and saves the result. The generated JSON-LD
    file serves as input for `generate_jsonschema()` and other data model operations.

    **Data Model Requirements:**

    Your CSV should include columns defining:

    - **Attribute names**: Property/attribute identifiers
    - **Display names**: Human-readable labels (optional but recommended)
    - **Descriptions**: Documentation for each attribute
    - **Valid values**: Allowed enum values for attributes (comma-separated)
    - **Validation rules**: Rules like `list`, `regex`, `inRange`, `required`, etc.
    - **Dependencies**: Relationships between attributes using `dependsOn`
    - **Required status**: Whether attributes are mandatory

    **Validation Checks Performed:**

    - Ensures all required fields (like `displayName`) are present
    - Detects cycles in attribute dependencies (which would create invalid schemas)
    - Checks for blacklisted characters in display names that Synapse doesn't allow
    - Validates that attribute names don't conflict with reserved system names
    - Verifies the graph structure is a valid directed acyclic graph (DAG)

    Arguments:
        schema: Path or URL to your data model CSV file. Can be a local file path or a URL
            (e.g., from GitHub). This file should contain your complete data model
            specification with all attributes, validation rules, and relationships.
        data_model_labels: Label format for the JSON-LD output:

            - `"class_label"` (default, recommended): Uses standard attribute names as labels
            - `"display_label"`: Uses display names as labels if they contain no blacklisted
              characters (parentheses, periods, spaces, hyphens), otherwise falls back to
              class labels. Use cautiously as this can affect downstream compatibility.
        output_jsonld: Path where the JSON-LD file will be saved. If None, saves alongside
            the input CSV with a `.jsonld` extension (e.g., `model.csv` → `model.jsonld`).
        synapse_client: Optional Synapse client instance for logging. If None, creates a
            new client instance. Use `Synapse.get_client()` or pass an authenticated client.

    **Output:**

    The function logs validation errors and warnings to help you fix data model issues
    before generating JSON schemas. Errors indicate critical problems that must be fixed,
    while warnings suggest improvements but won't block schema generation.

    Returns:
        The generated data model as a dictionary in JSON-LD format. The same data is
            also saved to the file path specified in `output_jsonld`.


    Example: Using this function to generate JSONLD Schema files:
        Basic usage with default output path:

        ```python
        from synapseclient import Synapse
        from synapseclient.extensions.curator import generate_jsonld

        syn = Synapse()
        syn.login()

        jsonld_model = generate_jsonld(
            schema="path/to/my_data_model.csv",
            data_model_labels="class_label",
            output_jsonld=None,  # Saves to my_data_model.jsonld
            synapse_client=syn
        )
        ```

        Specify custom output path:

        ```python
        jsonld_model = generate_jsonld(
            schema="models/patient_model.csv",
            data_model_labels="class_label",
            output_jsonld="~/output/patient_model_v1.jsonld",
            synapse_client=syn
        )
        ```

        Use display labels:
        ```python
        jsonld_model = generate_jsonld(
            schema="my_model.csv",
            data_model_labels="display_label",
            output_jsonld="my_model.jsonld",
            synapse_client=syn
        )
        ```

        Load from URL:
        ```python
        jsonld_model = generate_jsonld(
            schema="https://raw.githubusercontent.com/org/repo/main/model.csv",
            data_model_labels="class_label",
            output_jsonld="downloaded_model.jsonld",
            synapse_client=syn
        )
        ```
    """
    check_curator_imports()
    syn = Synapse.get_client(synapse_client=synapse_client)

    # Instantiate Parser
    data_model_parser = DataModelParser(path_to_data_model=schema, logger=syn.logger)

    # Parse Model
    syn.logger.info("Parsing data model.")
    parsed_data_model = data_model_parser.parse_model()

    # Convert parsed model to graph
    # Instantiate DataModelGraph
    data_model_grapher = DataModelGraph(
        parsed_data_model, data_model_labels, syn.logger
    )

    # Generate graphschema
    syn.logger.info("Generating data model graph.")
    graph_data_model = data_model_grapher.graph

    # Validate generated data model.
    syn.logger.info("Validating the data model internally.")
    data_model_validator = DataModelValidator(graph=graph_data_model, logger=syn.logger)
    data_model_errors, data_model_warnings = data_model_validator.run_checks()

    # If there are errors log them.
    if data_model_errors:
        for err in data_model_errors:
            if isinstance(err, str):
                syn.logger.error(err)
            elif isinstance(err, list):
                for error in err:
                    syn.logger.error(error)

    # If there are warnings log them.
    if data_model_warnings:
        for war in data_model_warnings:
            if isinstance(war, str):
                syn.logger.warning(war)
            elif isinstance(war, list):
                for warning in war:
                    syn.logger.warning(warning)

    syn.logger.info("Converting data model to JSON-LD")
    jsonld_data_model = convert_graph_to_jsonld(
        graph=graph_data_model, logger=syn.logger
    )

    # output JSON-LD file alongside CSV file by default, get path.
    if output_jsonld is None:
        if ".jsonld" not in schema:
            # If schema is a URL, extract just the filename for local output
            schema_path = schema
            if schema.startswith("http://") or schema.startswith("https://"):
                from urllib.parse import urlparse

                parsed_url = urlparse(schema)
                schema_path = os.path.basename(parsed_url.path)
            csv_no_ext = re.sub("[.]csv$", "", schema_path)
            output_jsonld = csv_no_ext + ".jsonld"
        else:
            output_jsonld = schema

        syn.logger.info(
            "By default, the JSON-LD output will be stored alongside the first "
            f"input CSV or JSON-LD file. In this case, it will appear here: '{output_jsonld}'. "
            "You can use the `--output_jsonld` argument to specify another file path."
        )

    # saving updated schema.org schema
    try:
        export_schema(
            schema=jsonld_data_model, file_path=output_jsonld, logger=syn.logger
        )
    except Exception:
        syn.logger.exception(
            (
                f"The Data Model could not be created by using '{output_jsonld}' location. "
                "Please check your file path again"
            )
        )
    return jsonld_data_model

generate_jsonschema

generate_jsonschema(data_model_source: str, synapse_client: Synapse, data_types: Optional[list[str]] = None, output: Optional[str] = None, data_model_labels: DisplayLabelType = 'class_label') -> tuple[list[dict[str, Any]], list[str]]

Generate JSON Schema files from a data model.

PARAMETER DESCRIPTION
data_model_source

Path or URL to the data model file (CSV or JSONLD). Can accept: - A local CSV file with your data model specification (will be parsed automatically) - A local JSONLD file generated from generate_jsonld() or equivalent - A URL pointing to a raw CSV data model (e.g., from GitHub) - A URL pointing to a raw JSONLD data model (e.g., from GitHub)

TYPE: str

synapse_client

Synapse client instance for logging. Use Synapse.get_client() or pass an existing authenticated client.

TYPE: Synapse

data_types

List of specific cdata types to generate schemas for. If None, generates schemas for all data types in the data model.

TYPE: Optional[list[str]] DEFAULT: None

output

One of: None, a directory path, or a file path. - If None, schemas will be written to the current working directory, with filenames formatted as <DataType>.json. - If a directory path, schemas will be written to that directory, with filenames formatted as <Output>/<DataType>.json. - If a file path (must end with .json) and a single data type is specified, the schema for that data type will be written to that file.

TYPE: Optional[str] DEFAULT: None

data_model_labels

Label format for properties in the generated schema: - "class_label" (default): Uses standard attribute names as property keys - "display_label": Uses display names if valid (no blacklisted characters),.

TYPE: DisplayLabelType DEFAULT: 'class_label'

RETURNS DESCRIPTION
tuple[list[dict[str, Any]], list[str]]

A tuple containing: - A list of JSON schema dictionaries, each corresponding to a data type - A list of file paths where the schemas were written

RAISES DESCRIPTION
ValueError

If a single output file is specified but multiple data types are requested.

Using this function to generate JSON Schema files:

Generate schema for one datatype:

from synapseclient import Synapse
from synapseclient.extensions.curator import generate_jsonschema

syn = Synapse()
syn.login()

schemas, file_paths = generate_jsonschema(
    data_model_source="path/to/model.csv",
    output="output.json",
    data_types=["Patient"],
    synapse_client=syn
)

Generate schema for specific data types:

schemas, file_paths = generate_jsonschema(
    data_model_source="path/to/model.csv",
    output="./schemas",
    data_types=["Patient", "Biospecimen"],
    synapse_client=syn
)

Generate schemas for all data types:

schemas, file_paths = generate_jsonschema(
    data_model_source="path/to/model.csv",
    output="./schemas",
    synapse_client=syn
)

Generate schema from CSV URL:

schemas, file_paths = generate_jsonschema(
    data_model_source="https://raw.githubusercontent.com/org/repo/main/model.csv",
    output_directory="./schemas",
    data_type=None,
    data_model_labels="class_label",
    synapse_client=syn
)
Source code in synapseclient/extensions/curator/schema_generation.py
5592
5593
5594
5595
5596
5597
5598
5599
5600
5601
5602
5603
5604
5605
5606
5607
5608
5609
5610
5611
5612
5613
5614
5615
5616
5617
5618
5619
5620
5621
5622
5623
5624
5625
5626
5627
5628
5629
5630
5631
5632
5633
5634
5635
5636
5637
5638
5639
5640
5641
5642
5643
5644
5645
5646
5647
5648
5649
5650
5651
5652
5653
5654
5655
5656
5657
5658
5659
5660
5661
5662
5663
5664
5665
5666
5667
5668
5669
5670
5671
5672
5673
5674
5675
5676
5677
5678
5679
5680
5681
5682
5683
5684
5685
5686
5687
5688
5689
5690
5691
5692
5693
5694
5695
5696
5697
5698
5699
5700
5701
5702
5703
5704
5705
5706
5707
5708
5709
5710
5711
5712
5713
5714
5715
5716
5717
5718
5719
5720
5721
5722
5723
5724
5725
5726
5727
5728
5729
5730
5731
5732
5733
5734
5735
def generate_jsonschema(
    data_model_source: str,
    synapse_client: Synapse,
    data_types: Optional[list[str]] = None,
    output: Optional[str] = None,
    data_model_labels: DisplayLabelType = "class_label",
) -> tuple[list[dict[str, Any]], list[str]]:
    """
    Generate JSON Schema files from a data model.

    Arguments:
        data_model_source: Path or URL to the data model file (CSV or JSONLD). Can accept:
            - A local CSV file with your data model specification (will be parsed automatically)
            - A local JSONLD file generated from `generate_jsonld()` or equivalent
            - A URL pointing to a raw CSV data model (e.g., from GitHub)
            - A URL pointing to a raw JSONLD data model (e.g., from GitHub)
        synapse_client: Synapse client instance for logging. Use `Synapse.get_client()`
            or pass an existing authenticated client.
        data_types: List of specific cdata types to generate schemas for. If None, generates schemas for all data types in the data model.
        output: One of: None, a directory path, or a file path.
            - If None, schemas will be written to the current working directory, with filenames formatted as `<DataType>.json`.
            - If a directory path, schemas will be written to that directory, with filenames formatted as `<Output>/<DataType>.json`.
            - If a file path (must end with `.json`) and a single data type is specified, the schema for that data type will be written to that file.
        data_model_labels: Label format for properties in the generated schema:
            - `"class_label"` (default): Uses standard attribute names as property keys
            - `"display_label"`: Uses display names if valid (no blacklisted characters),.

    Returns:
        A tuple containing:
            - A list of JSON schema dictionaries, each corresponding to a data type
            - A list of file paths where the schemas were written

    Raises:
        ValueError: If a single output file is specified but multiple data types are requested.

    Example: Using this function to generate JSON Schema files:
        Generate schema for one datatype:

        ```python
        from synapseclient import Synapse
        from synapseclient.extensions.curator import generate_jsonschema

        syn = Synapse()
        syn.login()

        schemas, file_paths = generate_jsonschema(
            data_model_source="path/to/model.csv",
            output="output.json",
            data_types=["Patient"],
            synapse_client=syn
        )
        ```

        Generate schema for specific data types:

        ```python
        schemas, file_paths = generate_jsonschema(
            data_model_source="path/to/model.csv",
            output="./schemas",
            data_types=["Patient", "Biospecimen"],
            synapse_client=syn
        )
        ```

        Generate schemas for all data types:

        ```python
        schemas, file_paths = generate_jsonschema(
            data_model_source="path/to/model.csv",
            output="./schemas",
            synapse_client=syn
        )
        ```

        Generate schema from CSV URL:

        ```python
        schemas, file_paths = generate_jsonschema(
            data_model_source="https://raw.githubusercontent.com/org/repo/main/model.csv",
            output_directory="./schemas",
            data_type=None,
            data_model_labels="class_label",
            synapse_client=syn
        )
        ```
    """
    check_curator_imports()
    data_model_parser = DataModelParser(
        path_to_data_model=data_model_source, logger=synapse_client.logger
    )
    parsed_data_model = data_model_parser.parse_model()
    data_model_graph = DataModelGraph(parsed_data_model)
    graph_data_model = data_model_graph.graph
    dmge = DataModelGraphExplorer(graph_data_model, logger=synapse_client.logger)

    if output is None:
        dirname = "./"
    elif output.endswith(".json"):
        dirname = os.path.dirname(output)
        dirname = dirname if dirname else "./"
    else:
        dirname = output
    os.makedirs(dirname, exist_ok=True)

    # Gets all data types if none are specified
    if data_types is None or len(data_types) == 0:
        data_types = [
            node for node in dmge.find_classes() if dmge.get_node_is_template(node)
        ]

    if len(data_types) == 0:
        msg = (
            "No data types found in the data model. "
            "Please ensure the data model is correctly specified. "
            "Use the 'IsTemplate' column in your data model to define data types."
        )
        raise ValueError(msg)

    if len(data_types) != 1 and output is not None and output.endswith(".json"):
        raise ValueError(
            f"Cannot write {len(data_types)} schemas to single file '{output}'. "
            "Specify a directory path instead, or request only one data type."
        )

    if len(data_types) == 1 and output is not None and output.endswith(".json"):
        schema_paths = [output]
    else:
        schema_paths = [
            os.path.join(dirname, f"{data_type}.json") for data_type in data_types
        ]

    schemas = [
        create_json_schema(
            dmge=dmge,
            datatype=data_type,
            schema_name=data_type,
            logger=synapse_client.logger,
            write_schema=True,
            schema_path=schema_path,
            use_property_display_names=(data_model_labels == "display_label"),
        )
        for data_type, schema_path in zip(data_types, schema_paths)
    ]
    return schemas, schema_paths

query_schema_registry

query_schema_registry(synapse_client: Optional[Synapse] = None, schema_registry_table_id: Optional[str] = None, column_config: Optional[SchemaRegistryColumnConfig] = None, return_latest_only: bool = True, **filters) -> Union[str, List[str], None]

Query the schema registry table to find schemas matching the provided filters.

This function searches the Synapse schema registry table for schemas that match the provided filter parameters. Results are sorted by version in descending order (newest first). The function supports any number of filter parameters as long as they are configured in the column_config.

PARAMETER DESCRIPTION
synapse_client

Optional authenticated Synapse client instance

TYPE: Optional[Synapse] DEFAULT: None

schema_registry_table_id

Optional Synapse ID of the schema registry table. If None, uses the default table ID.

TYPE: Optional[str] DEFAULT: None

column_config

Optional configuration for custom column names. If None, uses default configuration ('version' and 'uri' columns).

TYPE: Optional[SchemaRegistryColumnConfig] DEFAULT: None

return_latest_only

If True (default), returns only the latest URI as a string. If False, returns all matching URIs as a list of strings.

TYPE: bool DEFAULT: True

**filters

Filter parameters to search for matching schemas. These work as follows:

  Column-Based Filtering:
  - Any column name in the schema registry table can be used as a filter
  - Pass column names directly as keyword arguments
  - Common filters: dcc, datatype, version, uri
  - Any additional columns in your table can be used

  Filter Values:
  - Exact matching: Use plain strings (e.g., dcc="ad")
  - Pattern matching: Use SQL LIKE patterns with wildcards:
    * % = any sequence of characters
  - Examples:
    * dcc="ad" → matches exactly "ad"
    * datatype="%spec%" → matches any datatype containing "spec"

  Filter Logic:
  - Multiple filters are combined with AND (all must match)
  - At least one filter must be provided

DEFAULT: {}

RETURNS DESCRIPTION
Union[str, List[str], None]

If return_latest_only is True: Single URI string of the latest version, or None if not found

Union[str, List[str], None]

If return_latest_only is False: List of URI strings sorted by version (highest version first)

RAISES DESCRIPTION
ValueError

If no filter parameters are provided

Expected Table Structure

The schema registry table should contain columns for:

  • Schema version for sorting (default: 'version')
  • JSON schema URI (default: 'uri')
  • Any filterable columns as configured in column_config

Additional columns may be present and will be included in results.

Comprehensive filter usage demonstrations

This includes several examples of how to use the filtering system.

Basic Filtering (using default filters):

from synapseclient import Synapse
from synapseclient.extensions.curator import query_schema_registry

syn = Synapse()
syn.login()

# 1. Get latest schema URI for a specific DCC and datatype
latest_uri = query_schema_registry(
    synapse_client=syn,
    dcc="ad",  # Exact match for Alzheimer's Disease DCC
    datatype="Analysis"  # Exact datatype match
)
# Returns: "sage.schemas.v2571-ad.Analysis.schema-0.0.0"

# 2. Get all versions of matching schemas (not just latest)
all_versions = query_schema_registry(
    synapse_client=syn,
    dcc="mc2",
    datatype="Biospecimen",
    return_latest_only=False
)
# Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
#           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0"]

# 3. Pattern matching with wildcards
# Find all "Biospecimen" schemas across all DCCs
biospecimen_schemas = query_schema_registry(
    synapse_client=syn,
    datatype="Biospecimen",  # Exact match for Biospecimen
    return_latest_only=False
)
# Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
#           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0",
#           "sage.schemas.v2571-veo.Biospecimen.schema-0.3.0",
#           "sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"]

# 4. Pattern matching for DCC variations
mc2_schemas = query_schema_registry(
    synapse_client=syn,
    dcc="%C2",  # Matches 'mc2' and 'MC2'
    return_latest_only=False
)
# Returns schemas from both 'mc2' and 'MC2' DCCs

# 5. Using additional columns for filtering (if they exist in your table)
specific_schemas = query_schema_registry(
    synapse_client=syn,
    dcc="amp",  # Must be AMP DCC
    org="sage.schemas.v2571",  # Must match organization
    return_latest_only=False
)
# Returns schemas that match BOTH conditions

Direct Column Filtering (simplified approach):

# Any column in the schema registry table can be used for filtering
# Just use the column name directly as a keyword argument

# Basic filters using standard columns
query_schema_registry(dcc="ad", datatype="Analysis")
query_schema_registry(version="0.0.0")
query_schema_registry(uri="sage.schemas.v2571-ad.Analysis.schema-0.0.0")

# Additional columns (if they exist in your table)
query_schema_registry(org="sage.schemas.v2571")
query_schema_registry(name="ad.Analysis.schema")

# Multiple column filters (all must match)
query_schema_registry(
    dcc="mc2",
    datatype="Biospecimen",
    org="MultiConsortiaCoordinatingCenter"
)

Filter Value Examples with Real Data:

# Exact matching
query_schema_registry(dcc="ad")                   # Returns schemas with dcc="ad"
query_schema_registry(datatype="Biospecimen")     # Returns schemas with datatype="Biospecimen"
query_schema_registry(dcc="MC2")                  # Returns schemas with dcc="MC2" (case sensitive)

# Pattern matching with wildcards
query_schema_registry(dcc="%C2")                   # Matches "mc2", "MC2"
query_schema_registry(datatype="%spec%")           # Matches "Biospecimen"

# Examples with expected results:
query_schema_registry(dcc="ad", datatype="Analysis")
# Returns: "sage.schemas.v2571-ad.Analysis.schema-0.0.0"

query_schema_registry(datatype="Biospecimen", return_latest_only=False)
# Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
#           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0", ...]

# Multiple conditions (all must be true)
query_schema_registry(
    dcc="amp",             # AND
    datatype="Biospecimen", # AND
    org="sage.schemas.v2571"  # AND (if org column exists)
)
# Returns: ["sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"]

Source code in synapseclient/extensions/curator/schema_registry.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def query_schema_registry(
    synapse_client: Optional[Synapse] = None,
    schema_registry_table_id: Optional[str] = None,
    column_config: Optional[SchemaRegistryColumnConfig] = None,
    return_latest_only: bool = True,
    **filters,
) -> Union[str, List[str], None]:
    """
    Query the schema registry table to find schemas matching the provided filters.

    This function searches the Synapse schema registry table for schemas that match
    the provided filter parameters. Results are sorted by version in descending order
    (newest first). The function supports any number of filter parameters as long as
    they are configured in the column_config.

    Arguments:
        synapse_client: Optional authenticated Synapse client instance
        schema_registry_table_id: Optional Synapse ID of the schema registry table.
                                  If None, uses the default table ID.
        column_config: Optional configuration for custom column names.
                      If None, uses default configuration ('version' and 'uri' columns).
        return_latest_only: If True (default), returns only the latest URI as a string.
                           If False, returns all matching URIs as a list of strings.
        **filters: Filter parameters to search for matching schemas. These work as follows:

                  Column-Based Filtering:
                  - Any column name in the schema registry table can be used as a filter
                  - Pass column names directly as keyword arguments
                  - Common filters: dcc, datatype, version, uri
                  - Any additional columns in your table can be used

                  Filter Values:
                  - Exact matching: Use plain strings (e.g., dcc="ad")
                  - Pattern matching: Use SQL LIKE patterns with wildcards:
                    * % = any sequence of characters
                  - Examples:
                    * dcc="ad" → matches exactly "ad"
                    * datatype="%spec%" → matches any datatype containing "spec"

                  Filter Logic:
                  - Multiple filters are combined with AND (all must match)
                  - At least one filter must be provided

    Returns:
        If return_latest_only is True: Single URI string of the latest version, or None if not found
        If return_latest_only is False: List of URI strings sorted by version (highest version first)

    Raises:
        ValueError: If no filter parameters are provided

    Expected Table Structure:
        The schema registry table should contain columns for:

        - Schema version for sorting (default: 'version')
        - JSON schema URI (default: 'uri')
        - Any filterable columns as configured in column_config

        Additional columns may be present and will be included in results.

    Example: Comprehensive filter usage demonstrations
        This includes several examples of how to use the filtering system.

        Basic Filtering (using default filters):
        ```python
        from synapseclient import Synapse
        from synapseclient.extensions.curator import query_schema_registry

        syn = Synapse()
        syn.login()

        # 1. Get latest schema URI for a specific DCC and datatype
        latest_uri = query_schema_registry(
            synapse_client=syn,
            dcc="ad",  # Exact match for Alzheimer's Disease DCC
            datatype="Analysis"  # Exact datatype match
        )
        # Returns: "sage.schemas.v2571-ad.Analysis.schema-0.0.0"

        # 2. Get all versions of matching schemas (not just latest)
        all_versions = query_schema_registry(
            synapse_client=syn,
            dcc="mc2",
            datatype="Biospecimen",
            return_latest_only=False
        )
        # Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
        #           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0"]

        # 3. Pattern matching with wildcards
        # Find all "Biospecimen" schemas across all DCCs
        biospecimen_schemas = query_schema_registry(
            synapse_client=syn,
            datatype="Biospecimen",  # Exact match for Biospecimen
            return_latest_only=False
        )
        # Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
        #           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0",
        #           "sage.schemas.v2571-veo.Biospecimen.schema-0.3.0",
        #           "sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"]

        # 4. Pattern matching for DCC variations
        mc2_schemas = query_schema_registry(
            synapse_client=syn,
            dcc="%C2",  # Matches 'mc2' and 'MC2'
            return_latest_only=False
        )
        # Returns schemas from both 'mc2' and 'MC2' DCCs

        # 5. Using additional columns for filtering (if they exist in your table)
        specific_schemas = query_schema_registry(
            synapse_client=syn,
            dcc="amp",  # Must be AMP DCC
            org="sage.schemas.v2571",  # Must match organization
            return_latest_only=False
        )
        # Returns schemas that match BOTH conditions
        ```

        Direct Column Filtering (simplified approach):
        ```python
        # Any column in the schema registry table can be used for filtering
        # Just use the column name directly as a keyword argument

        # Basic filters using standard columns
        query_schema_registry(dcc="ad", datatype="Analysis")
        query_schema_registry(version="0.0.0")
        query_schema_registry(uri="sage.schemas.v2571-ad.Analysis.schema-0.0.0")

        # Additional columns (if they exist in your table)
        query_schema_registry(org="sage.schemas.v2571")
        query_schema_registry(name="ad.Analysis.schema")

        # Multiple column filters (all must match)
        query_schema_registry(
            dcc="mc2",
            datatype="Biospecimen",
            org="MultiConsortiaCoordinatingCenter"
        )
        ```

        Filter Value Examples with Real Data:
        ```python
        # Exact matching
        query_schema_registry(dcc="ad")                   # Returns schemas with dcc="ad"
        query_schema_registry(datatype="Biospecimen")     # Returns schemas with datatype="Biospecimen"
        query_schema_registry(dcc="MC2")                  # Returns schemas with dcc="MC2" (case sensitive)

        # Pattern matching with wildcards
        query_schema_registry(dcc="%C2")                   # Matches "mc2", "MC2"
        query_schema_registry(datatype="%spec%")           # Matches "Biospecimen"

        # Examples with expected results:
        query_schema_registry(dcc="ad", datatype="Analysis")
        # Returns: "sage.schemas.v2571-ad.Analysis.schema-0.0.0"

        query_schema_registry(datatype="Biospecimen", return_latest_only=False)
        # Returns: ["MultiConsortiaCoordinatingCenter-Biospecimen-12.0.0",
        #           "sage.schemas.v2571-mc2.Biospecimen.schema-9.0.0", ...]

        # Multiple conditions (all must be true)
        query_schema_registry(
            dcc="amp",             # AND
            datatype="Biospecimen", # AND
            org="sage.schemas.v2571"  # AND (if org column exists)
        )
        # Returns: ["sage.schemas.v2571-amp.Biospecimen.schema-0.0.1"]
        ```
    """
    syn = Synapse.get_client(synapse_client=synapse_client)
    logger = syn.logger

    # Use provided table ID or default
    table_id = (
        schema_registry_table_id
        if schema_registry_table_id
        else SCHEMA_REGISTRY_TABLE_ID
    )

    # Use provided column config or default
    if column_config is None:
        column_config = SchemaRegistryColumnConfig()

    # Validate that we have at least one filter
    if not filters:
        raise ValueError("At least one filter parameter must be provided")

    # Build WHERE clause from filters using column names directly
    where_conditions = []
    for column_name, filter_value in filters.items():
        # Check if the value contains SQL wildcards (% or _)
        if isinstance(filter_value, str) and (
            "%" in filter_value or "_" in filter_value
        ):
            # Use LIKE for pattern matching
            where_conditions.append(f"{column_name} LIKE '{filter_value}'")
        else:
            # Use exact match
            where_conditions.append(f"{column_name} = '{filter_value}'")

    where_clause = " AND ".join(where_conditions)

    # Construct SQL query using configurable column names
    # Results are sorted by version in descending order (newest first)
    query = f"""
    SELECT * FROM {table_id}
    WHERE {where_clause}
    ORDER BY {column_config.version_column} DESC
    """

    # Create a readable filter summary for logging
    filter_summary = ", ".join([f"{k}='{v}'" for k, v in filters.items()])

    logger.info(f"Querying schema registry with filters: {filter_summary}")
    logger.info(f"Using table: {table_id}")
    logger.info(f"SQL Query: {query}")

    # Query the table and get results as a pandas DataFrame
    table = Table(id=table_id)
    results_df = table.query(query=query, synapse_client=syn)

    if results_df.empty:
        logger.info(f"No schemas found matching filters: {filter_summary}")
        return None if return_latest_only else []

    # Extract URIs from the results and return as a list of strings
    uri_list = results_df[column_config.uri_column].tolist()

    logger.info(f"Found {len(uri_list)} matching schema(s):")
    for i, uri in enumerate(uri_list, 1):
        logger.info(f"  {i}. URI: {uri}")

    if return_latest_only:
        return uri_list[0] if uri_list else None
    else:
        return uri_list