Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Import RPG

Les données géométriques des parcelles de la France entière sont disponibles via le RPG, maintenu par le Ministère de l’Agriculture.

Chaque année, une nouvelle version est publiée sous forme d’archives .7z découpées en plusieurs parties (ex. RPG_2024_Ile-de-France.7z.001, RPG_2024_Ile-de-France.7z.002, etc.).

Chaque archive contient un fichier GeoPackage (.gpkg) avec les données parcellaires d’une région spécifique.

Concepts métier

TermeSignification
DatasetCollection annuelle (ex. “RPG 2024”). Statuts : DRAFT → COMPLETE → ACTIVE → ARCHIVED.
JobImport d’une région dans un dataset (ex. “RPG 2024 — Ile-de-France”).
RégionUne des 13 régions administratives françaises, codées en dur dans regions.ts.
ActivationPassage de isActiveRpg = true sur les parcelles issues des jobs complétés. Un seul dataset ACTIVE à la fois.
Multipart 7zArchives découpées en .7z.001, .7z.002, etc. Téléchargées séquentiellement, extraites comme une seule archive.

Flux

Admin UI → API (démarrer import) → file pg-boss → Worker
            events.emit(RPG_IMPORT_DISPATCH)       ├─ Téléchargement des fichiers sources
                                                   ├─ Extraction 7z → recherche rpg_parcelles.gpkg
                                                   ├─ Lecture GeoPackage (SQLite via bun:sqlite)
                                                   └─ Upsert des parcelles par lots de 500 (SQL brut)

Machine à états du workflow d’import

                          ┌─────────────────────────────────────┐
                          │                                     ▼
PENDING → DOWNLOADING → READY_TO_EXTRACT → EXTRACTING → PARSING → TRANSFORMING → IMPORTING → COMPLETED → ACTIVATED
   │           │                │               │            │           │             │          │
   │           └───────────────────────────────────────────────────────────────────────► FAILED   │
   │                            │                                                          │      │
   └────────────────────────────┴────────────────────────────────────────────────────► CANCELLED  │
                                                                                                  ▼
                                                                                               CANCELLED

FAILED est relançable quand :

  • Le statut est READY_TO_EXTRACT, ou
  • Le statut est FAILED et toutes les parties sont téléchargées (downloadedPartCount >= expectedPartCount), ou
  • Le statut est PENDING (job bloqué avant le démarrage du pipeline)

Mode DIRECT : saute READY_TO_EXTRACT — après le téléchargement, le pipeline continue directement vers EXTRACTING.

Mode MULTIPART_7Z : se met en pause à READY_TO_EXTRACT et attend qu’un admin déclenche manuellement l’étape d’extraction.

Il y a 2 modes d’import : DIRECT et MULTIPART_7Z. Le mode est déterminé automatiquement par le nombre de parties dans l’archive. DIRECT est utilisé pour les imports avec une seule archive .7z, tandis que MULTIPART_7Z est utilisé pour les imports avec des archives découpées en plusieurs parties.

Modules clés

transitionRpgImportJob(jobId, to, extraFields?)

Point d’entrée unique pour tous les changements de statut d’un job. Récupère le statut courant, valide la transition contre la machine à états, écrit de manière atomique. failRpgImportJob et completeRpgImportJob passent par cette fonction.

JobWorkspace

Propriétaire de tous les chemins filesystem du répertoire de travail d’un job :

  • workspace.dir — racine ($RPG_IMPORT_WORKDIR/<jobId>/)
  • workspace.downloadsDir — parties d’archive téléchargées
  • workspace.extractDir — GeoPackage extrait
  • workspace.cleanup() — supprime tout le workspace
  • workspace.cleanupExtracted() — supprime uniquement les artefacts extraits (conserve les téléchargements pour une nouvelle tentative)

Créé une seule fois dans pipeline.ts au démarrage du job, transmis aux fonctions d’archive.

logImportStep(jobId, step, message, level?)

Fonction simple. Écrit une ligne RpgImportLog. Niveau par défaut : INFO. Appelée depuis le pipeline et le geopackage-importer — pas d’instanciation de classe ni d’injection.

flushImportChunk(rows)

Exécute un pipeline PostgreSQL à 8 CTEs :

  1. input — liaison des valeurs de ligne
  2. prepared — cast des types, décodage WKB hex → géométrie PostGIS (SRID 2154)
  3. normalizedST_MakeValid sur les géométries invalides → ST_CollectionExtract(3)ST_Multi
  4. deduplicatedGROUP BY rpgSourceKey, ST_UnaryUnion pour les géométries multi-parties
  5. annotated — marquage isEmpty, alreadyExists
  6. upsertedINSERT … ON CONFLICT (rpgSourceKey) DO UPDATE
  7. SELECT — retourne les compteurs : insertedRows, updatedRows, skippedRows, validRows, invalidRows

Retourne ImportChunkStats. Aucun effet de bord sur le statut du job.

Décisions d’architecture

  1. SQL brut pour l’upsert : la normalisation géométrique PostGIS + l’upsert ON CONFLICT ne peuvent pas être exprimés via Prisma. flushImportChunk utilise prisma.$queryRawUnsafe.
  2. Streaming via bun:sqlite : lecture SQLite directe depuis le .gpkg, découpée en lots de 500 lignes. Pendant qu’un lot est flushé en DB, le suivant est parsé (pipeline via chaîne de promesses pendingFlush).
  3. pg-boss pour la durabilité : les jobs survivent aux redémarrages serveur. localConcurrency: 1 garantit le traitement séquentiel — pas d’imports concurrents, pas de race condition sur isActiveRpg.
  4. Clé source déterministe : hash SHA-256 de (sourceYear, regionCode, id_parcel, code_cultu, code_group, culture_d1, culture_d2, cat_cult_p) — relancer le même import fait un upsert au lieu de dupliquer.
  5. Limites de sécurité au téléchargement : 6 Gio par fichier, 30 Gio total, timeout de 20 minutes. Liste blanche d’hôtes via la variable d’environnement RPG_IMPORT_ALLOWED_HOSTS.
  6. Conservation des téléchargements en cas d’échec : si l’extraction ou l’import échoue mais que les téléchargements sont complets, le workspace est conservé. La nouvelle tentative saute le téléchargement et reprend directement à l’extraction.