The NACC Data Platform captures visit events throughout the data processing pipeline to track submission and quality control outcomes. Events are logged to S3 for downstream processing and analytics.
This document explains how the event capture system works for developers unfamiliar with the system.
The system captures two types of events:
Understanding the Flywheel container structure is essential:
graph TD
A[Project: ingest-form] --> B[Project Files<br/>CSV uploads & QC logs]
A --> C[Subject: 110001 PTID]
B --> B1[multi-visit-upload.csv<br/>uploaded by user]
B --> B2[110001_2024-01-15_UDS_qc-status.log<br/>tracks QC for visit 01]
B --> B3[110001_2024-02-20_UDS_qc-status.log<br/>tracks QC for visit 02]
C --> D[Session: FORMS-VISIT-01<br/>VISIT LEVEL]
C --> E[Session: FORMS-VISIT-02<br/>VISIT LEVEL]
D --> F[Acquisition: UDS<br/>MODULE LEVEL]
F --> G[NACC000001_FORMS-VISIT-01_UDS.json<br/>visit data file]
E --> H[Acquisition: UDS<br/>MODULE LEVEL]
H --> I[NACC000001_FORMS-VISIT-02_UDS.json<br/>visit data file]
style A fill:#f0f0f0,stroke:#333,stroke-width:2px
style B fill:#ffe6e6,stroke:#cc0000,stroke-width:2px
style D fill:#e1f5ff,stroke:#0066cc,stroke-width:3px
style E fill:#e1f5ff,stroke:#0066cc,stroke-width:3px
style F fill:#fff4e1,stroke:#cc6600,stroke-width:2px
style H fill:#fff4e1,stroke:#cc6600,stroke-width:2px
Key Points:
{ptid}_{visitdate}_{module}_qc-status.log)A single CSV file can contain data for multiple visits. The pipeline processes each visit separately:
Important Notes:
Understanding where files are stored is important for event capture:
{ptid}_{visitdate}_{module}_qc-status.log110001_2024-01-15_UDS_qc-status.log{naccid}_FORMS-VISIT-{visitnum}_{module}.json (for modules with visit numbers)NACC000001_FORMS-VISIT-01_UDS.jsonThe identifier-lookup gear captures submit events during CSV processing:
Key Benefits:
The form-scheduler gear captures pass-qc events during finalization queue processing:
Key Benefits:
There are two distinct workflows in the form processing system:
flowchart TD
A[CSV File Uploaded] --> B[form-screening]
B --> C[form-scheduler]
C --> D[nacc-file-validator]
D --> E[identifier-lookup]
E -->|Submit events logged here| F[form-transformer]
F --> G[form-qc-coordinator]
G --> H[form-qc-checker]
H --> I[Pipeline Completes]
I --> J[JSON files enter finalization queue]
J --> K[form-scheduler processes finalization queue]
K -->|Pass-QC events logged here| L[Done]
style E fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style K fill:#fff3e0,stroke:#f57c00,stroke-width:2px
flowchart TD
A[QC Passed JSON File] --> B[form-screening]
B --> C[form-scheduler]
C --> D[form-qc-coordinator]
D --> E[form-qc-checker<br/>for associated modules<br/>and subsequent visits]
E --> F[Pipeline Completes]
F --> G[JSON files enter finalization queue]
G --> H[form-scheduler processes finalization queue]
H -->|Pass-QC events logged here| I[Done]
style H fill:#fff3e0,stroke:#f57c00,stroke-width:2px
During CSV processing, identifier-lookup captures submit events:
sequenceDiagram
participant IdentifierLookup
participant CSVCaptureVisitor
participant S3EventLog
IdentifierLookup->>IdentifierLookup: Process CSV file
loop For each CSV row
IdentifierLookup->>CSVCaptureVisitor: visit_row(row, line_num)
CSVCaptureVisitor->>CSVCaptureVisitor: Validate required fields:<br/>- PTID<br/>- ADCID<br/>- MODULE<br/>- date field (module-specific)
alt All required fields present
CSVCaptureVisitor->>CSVCaptureVisitor: Create "submit" event<br/>(timestamp = file upload time)
CSVCaptureVisitor->>S3EventLog: capture_event(submit event)
else Missing required fields
CSVCaptureVisitor->>CSVCaptureVisitor: Skip event capture
end
end
CSVCaptureVisitor-->>IdentifierLookup: Done
When processing the finalization queue, form-scheduler captures pass-qc events:
sequenceDiagram
participant FormScheduler
participant EventAccumulator
participant Flywheel
participant S3EventLog
FormScheduler->>FormScheduler: Process finalization queue
FormScheduler->>EventAccumulator: capture_events(json_file, project)
EventAccumulator->>EventAccumulator: Generate QC log filename<br/>using ErrorLogTemplate
EventAccumulator->>Flywheel: Find qc-status log file<br/>at PROJECT level
Flywheel-->>EventAccumulator: QC status log file
EventAccumulator->>EventAccumulator: Check QC status using<br/>FileQCModel.get_file_status()
alt QC status is PASS
EventAccumulator->>EventAccumulator: Extract visit metadata from:<br/>1. QC status custom info, or<br/>2. JSON file metadata
EventAccumulator->>EventAccumulator: Create "pass-qc" event<br/>(timestamp = QC log modified time)
EventAccumulator->>S3EventLog: capture_event(pass-qc event)
else QC status is not PASS
EventAccumulator->>EventAccumulator: Skip event capture
end
EventAccumulator-->>FormScheduler: Done
What happens:
QC pass status is determined by checking the qc-status log file.
CRITICAL: “pass-qc” event is ONLY captured when:
def _check_qc_status(qc_log_file: FileEntry) -> bool:
"""Check if QC status is PASS.
Returns True ONLY if FileQCModel indicates PASS status.
"""
try:
qc_model = FileQCModel.model_validate(qc_log_file.info)
except ValidationError:
return False
# Check if QC status is PASS
file_status = qc_model.get_file_status()
return file_status == QC_STATUS_PASS
Each gear writes its validation status to file.info.qc. Both QC log files (at PROJECT level) and JSON files (at ACQUISITION level) contain this metadata structure.
For “pass-qc” events, we check the QC status log file at PROJECT level using FileQCModel.
Filename: 110001_2024-01-15_UDS_qc-status.log
file.info:
qc:
form-screening:
validation:
state: "PASS" | "FAIL" | "IN REVIEW"
data: [FileError, FileError, ...]
cleared: [...]
form-transformer:
validation:
state: "PASS"
data: []
form-qc-checker:
validation:
state: "PASS"
data: []
Filename: NACC000001_FORMS-VISIT-01_UDS.json
file.info:
forms:
json:
ptid: "110001"
visitnum: "01"
visitdate: "2024-01-15"
packet: "I"
module: "UDS"
# ... other form fields
qc:
# Same structure as QC log file
form-screening:
validation:
state: "PASS"
form-transformer:
validation:
state: "PASS"
form-qc-checker:
validation:
state: "PASS"
Key Points:
file.info.qc structure with gear validation statesEvents use different timestamps to reflect when actions actually occurred:
When a visit passes QC:
When a visit fails QC:
Event object logged to S3:
class VisitEvent(BaseModel):
action: str # "submit" or "pass-qc"
study: str # Study identifier (extracted from project label)
pipeline_adcid: int # ADCID for event routing
project_label: str # Project name (e.g., "ingest-form", "ingest-form-dvcid")
center_label: str # Center name
gear_name: str # "identifier-lookup" or "form-scheduler"
ptid: str # Participant ID
visit_date: date # Visit date
visit_number: str # Visit number (optional for some modules)
datatype: str # "form"
module: str # "UDS", "FTLD", "LBD", etc.
packet: Optional[str] # Packet type (optional)
timestamp: datetime # When action occurred
Metadata extracted from QC status or JSON files:
class VisitMetadata(BaseModel):
ptid: str # Participant ID
date: date # Visit date
visitnum: Optional[str] # Visit number
module: str # Module name
packet: Optional[str] # Packet type
Submit events are captured during CSV processing using the CSVCaptureVisitor.
During CSV processing:
# Create CSV capture visitor for submit events
csv_capture_visitor = CSVCaptureVisitor(
center_label=center.label,
project_label=project.label,
gear_name="identifier-lookup",
event_capture=event_capture,
module_configs=module_configs,
error_writer=error_writer,
timestamp=file.created, # Use file upload time
action="submit",
datatype="form"
)
# Process CSV with multiple visitors including event capture
csv_reader.accept_visitors([
naccid_lookup_visitor,
csv_capture_visitor, # Captures submit events
qc_status_visitor
])
Pass-qc events are captured during finalization queue processing.
After finalization queue processing:
def _log_pipeline_events(self, json_file: FileEntry) -> None:
"""Capture pass-qc events for finalized JSON files."""
if not self.__event_capture:
return
try:
from form_scheduler_app.event_accumulator import EventAccumulator
event_accumulator = EventAccumulator(event_capture=self.__event_capture)
event_accumulator.capture_events(json_file=json_file, project=self.__project)
except Exception as error:
log.warning(f"Failed to capture events for {json_file.name}: {error}")
The event capture process includes robust error handling:
Events are written to S3 in a flat structure organized by environment.
s3://event-bucket/
├── prod/
│ ├── log-submit-{YYYYMMDD-HHMMSS}-{adcid}-{project}-{ptid}-{visitnum}.json
│ └── log-pass-qc-{YYYYMMDD-HHMMSS}-{adcid}-{project}-{ptid}-{visitnum}.json
└── dev/
└── ...
Filename Format: log-{action}-{timestamp}-{adcid}-{project}-{ptid}-{visitnum}.json
Where:
submit, pass-qc)YYYYMMDD-HHMMSS (when action occurred)For a visit with:
prod42ingest-form (ADRC study)110001012024-01-15T10:00:00Z2024-01-15T10:20:00ZEvents are written to:
s3://event-bucket/prod/log-submit-20240115-100000-42-ingest-form-110001-01.json
s3://event-bucket/prod/log-pass-qc-20240115-102000-42-ingest-form-110001-01.json
For a DVCID study visit:
s3://event-bucket/prod/log-submit-20240115-100000-44-ingest-form-dvcid-110003-01.json
Each event file contains a JSON object with the complete VisitEvent data:
{
"action": "submit",
"study": "adrc",
"pipeline_adcid": 42,
"project_label": "ingest-form",
"center_label": "alpha",
"gear_name": "identifier-lookup",
"ptid": "110001",
"visit_date": "2024-01-15",
"visit_number": "01",
"datatype": "form",
"module": "UDS",
"packet": "I",
"timestamp": "2024-01-15T10:00:00Z"
}
The flat structure optimizes for the primary use case: scraping all events into a single Parquet table.
Advantages:
The “pass-qc” event is captured when visits successfully complete QC validation:
Not all modules include visit numbers in their session labels. For modules without visit numbers:
During CSV processing in identifier-lookup:
Event capture in the form processing pipeline is split between two gears:
Key Points: