Intégration des rsa au format delta table avec { pypmsi, polars / spark }

Author

gpr

Published

September 8, 2024


Objectif

L’objectif de ce travail était de tester la faisabilité d’harmoniser les 4 tables de RSA sur l’historique 2013 – 2023 pour les intégrer dans un schéma homogène de données souhaité par Delta Lake (il est souhaité / préférable que le type, l’ordre et le nom des colonnes soient identiques pour faire du APPEND ; c’est en tout cas le cas avec polars).

  • La première partie de ce document se concentre sur cette partie de normalisation des données (au sens où on les force à avoir toutes le même format d’année en année
  • La 2ème partie teste l’intégration des données RSA de 2013 M12 à 2023 M12 sous forme de fichiers parquet écrit avec le module polars
  • La 3ème partie comprend des tests de requêtage des données avec polars uniquement ; une fonction remote_table('rsa@rsa') permet d’accéder aux données via un LazyFrame polars flêchant vers les fichiers parquet
  • La dernière partie montre le contenu d’un des fichiers json qui historise les étapes d’alimentations de la table rsa@rsa au format Delta

Au cours de ce test, un partitionnement par année-mois a été expérimenté mais les temps de requêtages étaient augmentés de manière importante (* 4, selon les volumes intégrés). L’option de mono-partition semble préférable tant que les volumes de données sont raisonnables.

Cette étape d’intégration en gardant toutes les colonnes des données PMSI sans en exclure aucune pourrait correspondre à l’étape de la couche bronze du lac de données, avec les données brutes associées.

N’ont pas été fait :

  • tester la commande SQL ADD COLUMN / ALTER TABLE sur certaines tables si les formats 2024 des données PMSI changeait, et pour les années à suivre.
  • créer la clé primaire contenant l’année de sortie et le numéro de RSS accolés permettant d’avoir une unicité dans les relations
  • tester la même chose sur les RSS, la même donnée contenant le lien avec les passages dans les unités médicales, mais cela peut être adapté de manière très similaire


import pandas as pd
import polars as pl
import pypmsi as pm

import pyspark
from delta import *
from delta.tables import *
from pyspark.sql.functions import *

Intégrations

Paramètres

p = pm.noyau_pmsi(finess = 290000017, 
              annee  = 2023,
              mois   = 12,
              path   = '/Users/guillaumepressiat/Documents/data/mco')

#param_n_max = 1000
param_n_max = None

Fonctions utiles

def normalized_schema():
    # Définir un schéma type de données

    # On récupère des années antérieures la dispersion des formats pmsi entre 2011 et 2023, 
    # cela permettra de mettre en cohérence les données sous un seul "schéma" avant de
    # les insérer en delta table

    rsa_temp = {'rsa' : pl.DataFrame(), 'actes' : pl.DataFrame(), 'diags' : pl.DataFrame(), 'rsa_um' : pl.DataFrame()}
    
    for i in range(2023, 2012, -1):
        # print(i)
        temp = p.irsa(annee = i, mois =  12, typi = 6, n_rows = 1000)
        
        # concat avec how = diagonal ici, on ajoute des colonnes si elles manquent
        rsa_temp['rsa'] = pl.concat([rsa_temp['rsa'], temp['rsa']], how="diagonal")
        rsa_temp['actes'] = pl.concat([rsa_temp['actes'], temp['actes']], how="diagonal")
        rsa_temp['diags'] = pl.concat([rsa_temp['diags'], temp['diags']], how="diagonal")
        rsa_temp['rsa_um'] = pl.concat([rsa_temp['rsa_um'], temp['rsa_um']], how="diagonal")

    temp = p.irsa(annee = 2023, mois =  12, typi = 6, n_rows = 1000)
    
    # pour aller vite, 2023 est à part car mois = 4
    rsa_temp['rsa'] = pl.concat([rsa_temp['rsa'], temp['rsa']], how="diagonal")
    rsa_temp['actes'] = pl.concat([rsa_temp['actes'], temp['actes']], how="diagonal")
    rsa_temp['diags'] = pl.concat([rsa_temp['diags'], temp['diags']], how="diagonal")
    rsa_temp['rsa_um'] = pl.concat([rsa_temp['rsa_um'], temp['rsa_um']], how="diagonal")
    
    rsa_temp['rsa'] = rsa_temp['rsa'].join(p.itra(annee = 2023, mois = 12, n_rows = 1000), how = 'left', on = 'cle_rsa')

    rsa_temp['rsa'] = (rsa_temp['rsa']
     .with_columns(pl.concat_str(['ansor', 'moissor']).alias('periode'),
                  pl.concat_str(['ansor', 'moissor']).alias('am')))
    
    empty = dict()
    empty['rsa'] = pl.DataFrame(schema = rsa_temp['rsa'].schema)
    empty['actes'] = pl.DataFrame(schema = rsa_temp['actes'].schema)
    empty['diags'] = pl.DataFrame(schema = rsa_temp['diags'].schema)
    empty['rsa_um'] = pl.DataFrame(schema = rsa_temp['rsa_um'].schema)

    return empty

def normalize_schema_rsa(dfd, table):
    # pour appliquer le schéma type de données si il n'est pas respecté

    # Sur la base des formats de toutes les années (unions permissives)
    # on concatène les rsa d'une année avec ces formats normalisés > on génère des colonnes null
    # pour avoir un schéma commun à toutes les années

    # how = align ici, et on ordonne les colonnes si elles ne sont pas dans le même ordre
    if table != 'rsa':
        df = (pl.concat([dfd[table],empty[table]], how = 'align')
              .select(sorted(empty[table].columns))
              .join(dfd['rsa'].select('cle_rsa', 'am', 'periode', 'nas', 
                                      'norss', 'ansor', 'moissor'), 
                    on = 'cle_rsa', how = 'inner').with_columns(
               pl.when(pl.col(pl.Utf8) == "")
            .then(None)
            .otherwise(pl.col(pl.Utf8))
            .name.keep()
        ))
    else:
        df = pl.concat([empty[table], dfd['rsa']], how = 'align').with_columns(
            pl.when(pl.col(pl.Utf8) == "")
            .then(None)
            .otherwise(pl.col(pl.Utf8)) # keep original value
            .name.keep()
        ).select(sorted(empty[table].columns))

    return df

# on prépare les rsa d'une année (on les normalise vis-à-vis du schéma unifié
# avec la fonction au dessus
# + aussi ajout du fichier tra et de deux colonnes de temps
def prepare_rsa(p, annee1, mois1, n_max = param_n_max):
    # Lire les données et les mettre sous le bon schéma

    
    rsa = p.irsa(typi = 6, annee = annee1, mois = mois1, n_rows = n_max)
    tra = p.itra(annee = annee1, mois = mois1, n_rows = n_max)
    rsa['rsa'] = rsa['rsa'].join(tra, on = 'cle_rsa', how = 'inner')
    rsa['rsa'] = (rsa['rsa']
                  .with_columns(pl.concat_str(['ansor', 'moissor']).alias('periode'),
                                pl.concat_str(['ansor', 'moissor']).alias('am')))
    rsa = {k: normalize_schema_rsa(rsa, k) for k, v in rsa.items()}

    return rsa


def one_delta(dfd, table):
    # pour écrire une des tables des rsa au format delta
    #print(dfd[table].schema)
    #print(dfd[table].shape)
    
    dfd[table].write_delta(
        "/Users/guillaumepressiat/Documents/data/delta/rsa/" + table, 
        #large_dtypes = True,
                    mode = "append") #, overwrite_schema = True


def write_rsa_delta(p, annee1, mois1):
    # pour lire les fichiers .rsa et tra, normaliser et écrire toutes les tables de rsa en delta
    
    temp = prepare_rsa(p, annee1, mois1)

    for k in ['rsa', 'actes', 'diags', 'rsa_um']:
        one_delta(temp, k)

def remote_table(pmsi_table):
    # pour faciliter la syntaxe, un wrapper pour accéder aux données LazyFrame
    
    return pl.scan_delta("/Users/guillaumepressiat/Documents/data/delta/" + pmsi_table.replace('@', '/'))

def count_pivot_periode(pmsi_table):
    # tri à plat dans les fichiers delta pour observer ce qui a été intégré

    tempd = remote_table(pmsi_table)
    r = (tempd
     .group_by('am', 'ansor', 'moissor')
     .len()
     .collect()
     .sort('moissor')
     .pivot('moissor', values = 'len', index = 'ansor',
            aggregate_function = 'sum', maintain_order=True)
     .sort('ansor')
    )
    return r



def delete_in_delta(pmsi_table, string_delete):
    # fonction pour supprimer des données dans delta table (utilise pyspark)
    
    dt_temp = DeltaTable.forPath(spark, '/Users/guillaumepressiat/Documents/data/delta/' + pmsi_table.replace('@', '/'))
    dt_temp.delete(string_delete)

Chargement des données

empty = normalized_schema()
# empty['rsa'].schema
# de 2011 M12 à 2022 M12
for i in range(2013, 2024):
    # print(i)
    write_rsa_delta(p, i, 12)
# 2024 M07, année en cours, séparemment
# write_rsa_delta(p, 2024, 7)
# remote_table('rsa@actes').collect().filter(pl.col('ansor').is_null())

Volumétries

%%time
count_pivot_periode('rsa@rsa')
CPU times: user 134 ms, sys: 29.7 ms, total: 164 ms
Wall time: 53.9 ms
shape: (11, 13)
ansor 01 02 03 04 05 06 07 08 09 10 11 12
str u32 u32 u32 u32 u32 u32 u32 u32 u32 u32 u32 u32
"2013" 9167 8806 9540 9158 8465 8679 9317 8032 8305 9309 8432 8751
"2014" 9405 8525 9006 9063 8563 8986 9274 8263 9450 9926 8779 9518
"2015" 9296 8970 9763 9019 8374 9561 9216 8171 9493 9999 9419 9905
"2016" 9724 9591 10515 9912 9812 10216 9229 9272 10269 10187 10333 10567
"2017" 10668 9656 11052 9818 10936 11431 10230 10274 10662 11118 10731 10361
"2019" 11665 10526 11687 11162 10776 10660 11233 9841 10745 11357 10570 11238
"2020" 11885 10942 10220 8568 9483 11289 11469 10046 11786 11417 10480 11335
"2021" 11310 10858 12453 11879 11453 12084 11418 10650 12091 11910 11593 11981
"2022" 11913 11075 12889 11559 12057 12120 11048 11280 12257 12171 11766 11766
"2023" 12557 11288 13248 11169 12028 13015 11572 11321 11643 12432 11955 11235
%%time
count_pivot_periode('rsa@rsa_um')
CPU times: user 141 ms, sys: 26.3 ms, total: 167 ms
Wall time: 54.3 ms
shape: (11, 13)
ansor 01 02 03 04 05 06 07 08 09 10 11 12
str u32 u32 u32 u32 u32 u32 u32 u32 u32 u32 u32 u32
"2013" 10330 9828 10647 10439 9549 9711 10512 8966 9341 10474 9427 10017
"2014" 10624 9735 10185 10269 9633 10124 10454 9283 10494 11090 9982 10797
"2015" 10478 10145 11098 10246 9490 10877 10511 9181 10668 11344 10679 11236
"2016" 10841 10739 11880 11112 11013 11448 10382 10437 11431 11295 11570 11891
"2017" 11940 10976 12389 10960 12232 12629 11463 11320 11811 12396 11968 11636
"2019" 13112 11791 13112 12476 12075 11918 12519 10909 12017 12720 11853 12524
"2020" 13209 12312 11564 9666 10632 12422 12671 11007 13037 12627 11629 12613
"2021" 12480 12037 13841 13153 12636 13353 12683 11718 13282 13238 12878 13286
"2022" 13153 12207 14204 12799 13317 13451 12325 12284 13474 13516 13011 13148
"2023" 13884 12507 14612 12360 13309 14284 12726 12290 12801 13772 13281 12432
%%time
count_pivot_periode('rsa@actes')
CPU times: user 416 ms, sys: 75.1 ms, total: 491 ms
Wall time: 143 ms
shape: (11, 13)
ansor 01 02 03 04 05 06 07 08 09 10 11 12
str u32 u32 u32 u32 u32 u32 u32 u32 u32 u32 u32 u32
"2013" 39450 35718 40894 38241 34470 36537 39552 35240 34559 37183 33334 36499
"2014" 36827 36179 36302 37509 33753 33814 34192 31689 32734 29792 31938 35835
"2015" 34416 33117 37165 34225 33908 35155 36629 30097 35562 36297 34807 36216
"2016" 34992 35461 39169 36779 36655 36178 34188 33633 36894 35207 35265 36268
"2017" 38756 35172 38395 35454 35330 37909 38250 34571 36344 36986 34595 36148
"2019" 39926 35786 40429 37587 35955 36130 37412 31360 37032 37933 33767 35833
"2020" 37193 35813 34328 27193 30666 35311 37232 33115 37461 37052 33694 34086
"2021" 34487 35112 41197 37287 33438 36341 35283 31555 34607 33530 31714 34134
"2022" 34516 34555 38610 33976 38406 37171 33459 32308 36255 35948 34459 36271
"2023" 40215 34487 39588 34359 35284 38578 34549 31679 34145 35845 36788 33018
%%time
count_pivot_periode('rsa@diags')
CPU times: user 747 ms, sys: 201 ms, total: 948 ms
Wall time: 272 ms
shape: (11, 13)
ansor 01 02 03 04 05 06 07 08 09 10 11 12
str u32 u32 u32 u32 u32 u32 u32 u32 u32 u32 u32 u32
"2013" 51552 48464 55267 56511 50818 51976 56642 48546 51040 57132 50241 53880
"2014" 58166 53540 55703 57256 52912 54255 55540 50383 55682 59274 52908 56873
"2015" 55375 55018 59977 56956 50693 59464 57639 50895 57099 60067 56522 60209
"2016" 59527 59878 66432 62540 60556 62283 57176 57516 62489 61596 62967 63929
"2017" 65743 61249 68527 60981 66991 67664 63843 62677 62332 65690 65671 64233
"2019" 72868 67123 73669 70692 65988 66030 71534 61964 67261 68819 62005 65833
"2020" 70200 64916 62578 55956 59858 67101 68166 60045 68677 68823 62851 66577
"2021" 67093 65017 74707 70623 67517 68375 69498 64400 69719 69529 67911 67933
"2022" 70203 65027 75626 70335 72738 71725 68589 65744 71751 72048 67011 69758
"2023" 70549 66490 79108 68151 73326 76715 68492 66203 68642 74670 73884 68997

Update (delete de l’année en cours)

On passe à pyspark pour pouvoir lancer des delete dans les fichiers delta lake (il faut installer un java, d’une certaine version, pour que cela fonctionne correctement `

avec quelque chose comme :

export JAVA_HOME=`/usr/libexec/java_home -v 1.8`
from pyspark.sql import SparkSession

#from pyspark import SparkContext
#SparkContext.setSystemProperty('spark.executor.memory', '4g')

builder = pyspark.sql.SparkSession.builder.appName("MyPmsiDLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.executor.memory", "10g") \
    .config("spark.executor.cores", 12) \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors","10") \
    .config("spark.dynamicAllocation.maxExecutors","15")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


# https://stackoverflow.com/a/24657630/10527496
# https://stackoverflow.com/questions/70192980/error-creating-sparkcontext-locally-an-error-occurred-while-calling-none-org-apa
24/09/08 23:15:12 WARN Utils: Your hostname, ......... resolves to a loopback address: 127.0.0.1; using ......... instead (on interface en0)
24/09/08 23:15:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/guillaumepressiat/.ivy2/cache
The jars for the packages stored in: /Users/guillaumepressiat/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-126ee81f-c0c6-45e3-aeef-1cbcde32f6e4;1.0
    confs: [default]
    found io.delta#delta-core_2.12;2.4.0 in central
    found io.delta#delta-storage;2.4.0 in central
    found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 80ms :: artifacts dl 3ms
    :: modules in use:
    io.delta#delta-core_2.12;2.4.0 from central in [default]
    io.delta#delta-storage;2.4.0 from central in [default]
    org.antlr#antlr4-runtime;4.9.3 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-126ee81f-c0c6-45e3-aeef-1cbcde32f6e4
    confs: [default]
    0 artifacts copied, 3 already retrieved (0kB/2ms)
24/09/08 23:15:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
:: loading settings :: url = jar:file:/opt/homebrew/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
annee_delete = '2023'
string_delete = "ansor = \"" + annee_delete + "\""
string_delete
# string_delete = "ansor is NULL"
'ansor = "2023"'
for i in ['rsa@rsa', 'rsa@actes', 'rsa@diags', 'rsa@rsa_um']:
    delete_in_delta(i, string_delete)
    
24/09/08 23:15:17 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
CodeCache: size=131072Kb used=48351Kb max_used=48384Kb free=82720Kb
 bounds [0x0000000106880000, 0x0000000109830000, 0x000000010e880000]
 total_blobs=17469 nmethods=16361 adapters=1019
 compilation: disabled (not enough contiguous free space left)

2023 a bien été supprimé

%%time
count_pivot_periode('rsa@rsa')
CPU times: user 133 ms, sys: 50.5 ms, total: 183 ms
Wall time: 60.5 ms
shape: (10, 13)
ansor 01 02 03 04 05 06 07 08 09 10 11 12
str u32 u32 u32 u32 u32 u32 u32 u32 u32 u32 u32 u32
"2013" 9167 8806 9540 9158 8465 8679 9317 8032 8305 9309 8432 8751
"2014" 9405 8525 9006 9063 8563 8986 9274 8263 9450 9926 8779 9518
"2015" 9296 8970 9763 9019 8374 9561 9216 8171 9493 9999 9419 9905
"2016" 9724 9591 10515 9912 9812 10216 9229 9272 10269 10187 10333 10567
"2017" 10668 9656 11052 9818 10936 11431 10230 10274 10662 11118 10731 10361
"2018" 10934 10013 11135 10196 10086 10838 10730 10364 10529 11423 10974 10944
"2019" 11665 10526 11687 11162 10776 10660 11233 9841 10745 11357 10570 11238
"2020" 11885 10942 10220 8568 9483 11289 11469 10046 11786 11417 10480 11335
"2021" 11310 10858 12453 11879 11453 12084 11418 10650 12091 11910 11593 11981
"2022" 11913 11075 12889 11559 12057 12120 11048 11280 12257 12171 11766 11766
for i in ['rsa@rsa', 'rsa@actes', 'rsa@diags', 'rsa@rsa_um']:
    delete_in_delta(i, "moissor in ('03', '06', '09', '12')")
24/09/08 23:15:28 WARN MemoryManager: ..........
%%time
count_pivot_periode('rsa@rsa')
CPU times: user 113 ms, sys: 22.2 ms, total: 135 ms
Wall time: 60.6 ms
shape: (10, 9)
ansor 01 02 04 05 07 08 10 11
str u32 u32 u32 u32 u32 u32 u32 u32
"2013" 9167 8806 9158 8465 9317 8032 9309 8432
"2014" 9405 8525 9063 8563 9274 8263 9926 8779
"2015" 9296 8970 9019 8374 9216 8171 9999 9419
"2016" 9724 9591 9912 9812 9229 9272 10187 10333
"2017" 10668 9656 9818 10936 10230 10274 11118 10731
"2018" 10934 10013 10196 10086 10730 10364 11423 10974
"2019" 11665 10526 11162 10776 11233 9841 11357 10570
"2020" 11885 10942 8568 9483 11469 10046 11417 10480
"2021" 11310 10858 11879 11453 11418 10650 11910 11593
"2022" 11913 11075 11559 12057 11048 11280 12171 11766

vacuum, à faire

Nettoyer régulièrement les fichiers parquet inutiles

Tests

On ne peut pas joindre les tables delta avec polars > on scan / filtre et on collecte les éléments par morceaux.

Par morceaux, on entend : entre parenthèses, on collecte les lignes correspondants à la table rsa@rsa, puis la table rsa@actes et on joint ensuite ces deux tables.

Dans notre exemple ici :

  • on s’intéresse aux séjours-RSS chirurgicaux (GHM type C), sortis en mai 2020
  • qui ont un acte de remplacement de l’articulation hanche ou genou par prothèse totale (actes N.KA), on filtre sur l’activité CCAM 1
  • on rajoutera une clause ensuite sur la présence d’un diagnostic S72, fracture du col du fémur en position principale du séjour (position = 1)


# d'abord les rsa concernés par le filtre GHM C sortis en mai 2020
(remote_table('rsa@rsa')
 .filter(pl.col('am') == '202005')
 .filter(pl.col('rsatype') == 'C')
 .unique()
 .collect()
)
# Ensuite les actes correspondants au pattern
(remote_table('rsa@actes')
 .filter((pl.col('am') == '202005')) 
 .filter(pl.col('cdccam').str.contains('N.KA'))
 .filter(pl.col('act') == '1')
 .unique()
 .collect()
)
# Et la table des diagnostics
(remote_table('rsa@diags')
 .filter((pl.col('am') == '202005')) 
 .filter(pl.col('diag').str.contains('S72'))
 .filter(pl.col('position') == 1)
 .select('cle_rsa')
 .unique()
 .collect()
)

rsa partie fixe & actes

%%time
# Jointure entre la table des RSA partie fixe et la table des actes
(remote_table('rsa@rsa')
 .filter(pl.col('am') == '202005')
 .filter(pl.col('rsatype') == 'C')
 .unique()
 .collect()
).join(
(remote_table('rsa@actes')
 .filter((pl.col('am') == '202005')) 
 .filter(pl.col('cdccam').str.contains('N.KA'))
 .filter(pl.col('act') == '1')
 .unique()
 .collect()
), on = 'cle_rsa', how = 'inner'
).group_by('nofiness', 'am').agg(pl.col('cle_rsa').n_unique().alias('count'))
CPU times: user 978 ms, sys: 347 ms, total: 1.32 s
Wall time: 463 ms
shape: (1, 3)
nofiness am count
str str u32
"290000017" "202005" 32

En alternative, on peut aussi utiliser les colonnes “stream” de la table RSA qui contiennent tous les actes, diags et permettent d’éviter de faire des jointures entre plusieurs tables quand l’information souhaitée est juste (présence ou absence de l’acte, du diag, etc.), c’est ce qui est fait ci-dessous

%%time
# alternative en utilisant la zone "stream" des actes en ligne
(remote_table('rsa@rsa')
 .filter(
     (pl.col('am') == '202005') & 
     (pl.col('rsatype') == 'C') & 
         (pl.col('stream_actes').str.contains('N.KA')))
 .unique()
 .collect()
).group_by('nofiness', 'am').agg(pl.col('cle_rsa').n_unique().alias('count'))
CPU times: user 1.05 s, sys: 252 ms, total: 1.3 s
Wall time: 431 ms
shape: (1, 3)
nofiness am count
str str u32
"290000017" "202005" 32

rsa partie fixe & actes & diags

On ajoute la clause sur le diagnostic principal S72


%%time
# Jointure entre la table des RSA partie fixe et la table des actes
(remote_table('rsa@rsa')
 .filter(pl.col('am') == '202005')
 .filter(pl.col('rsatype') == 'C')
 .unique()
 .collect()
).join(
(remote_table('rsa@actes')
 .filter((pl.col('am') == '202005')) 
 .filter(pl.col('cdccam').str.contains('N.KA'))
 .filter(pl.col('act') == '1')
 .unique()
 .collect()
), on = 'cle_rsa', how = 'inner'
).join(
(remote_table('rsa@diags')
 .filter((pl.col('am') == '202005')) 
 .filter(pl.col('diag').str.contains('S72'))
 .filter(pl.col('position') == 1)
 .select('cle_rsa')
 .unique()
 .collect()
), on = 'cle_rsa', how = 'inner'
).group_by('nofiness', 'am').agg(pl.col('cle_rsa').n_unique().alias('count'))
CPU times: user 1.27 s, sys: 179 ms, total: 1.44 s
Wall time: 507 ms
shape: (1, 3)
nofiness am count
str str u32
"290000017" "202005" 14
%%time
# alternative en utilisant la zone "stream" des actes en ligne, et la colonne dp
(remote_table('rsa@rsa')
 .filter(
     (pl.col('am') == '202005') & 
     (pl.col('rsatype') == 'C') & 
         (pl.col('stream_actes').str.contains('N.KA')) & 
         (pl.col('dp').str.contains('S72')))
 .unique()
 .collect()
).group_by('nofiness', 'am').len()
CPU times: user 1.04 s, sys: 148 ms, total: 1.18 s
Wall time: 389 ms
shape: (1, 3)
nofiness am len
str str u32
"290000017" "202005" 14

Nombre de lignes / colonnes des tables

Après avoir enlevé 2023 M04

remote_table('rsa@rsa').collect().shape
(816519, 108)
remote_table('rsa@actes').collect().shape
(2818046, 19)
remote_table('rsa@diags').collect().shape
(4972199, 10)
remote_table('rsa@rsa_um').collect().shape
(912675, 23)

Description des rsa

Avec la fonction describle de polars on affiche des statistiques surles données rsa.

Le choix a été fait d’ordonner les colonnes par ordre alphabétique.

remote_table('rsa@rsa').collect().describe()
shape: (9, 109)
statistic admission_maison_naissance adnp75 agean agegest agejr am anivgprec anseqta ansor cat_nb_intervenants cdgeo cdpostal cdpu cle_rsa confcdsej conversion_hc delaireg dest dp dr dtent dtsort duree echpmsi ell_forf_diabete ell_gradation genautorsa ghm ghm1 ghshorsinno ghsminore gpcdretr gpcmd gpcompx gpnum gptype novrsa novrss numinno paslitsp pc_raac periode poids prov rescrit_tarifaire resererve_hosp rsacdretr rsacmd rsacompx rsanum rsatype rsavclass schpmsi sejinfbi sexe stream_actes stream_das stream_dpum stream_drum stream_um suppdefcard surveillance_particuliere topadmnais topctc topradalimta topradavastin typedosim typesej typmachradio typrestpo uhcd valvaort zrdth
str str str f64 f64 f64 str str str str str str str str str str str f64 str str str str str f64 str str str str str str str str str str str str str str str str str str str f64 str str str str str str str str str str str str str str str str str str str str str str str str str str str str str str
"count" "268230" "345" 788647.0 31919.0 27872.0 "816519" "78064" "816519" "816519" "8339" "816519" "268230" "0" "816519" "742" "393" 25849.0 "57401" "816519" "393904" "816519" "816519" 816519.0 "816519" "64939" "154651" "674035" "816519" "816519" "601571" "655769" "816519" "816519" "816519" "816519" "816519" "816519" "816519" "5" "816519" "333169" "816519" 18270.0 "190281" "0" "3996" "816519" "816519" "816519" "816519" "816519" "816519" "816519" "816519" "816519" "626615" "419122" "816519" "397444" "816519" "503187" "9761" "0" "0" "0" "582256" "155923" "966" "122544" "816519" "816519" "1117" "4299"
"null_count" "548289" "816174" 27872.0 784600.0 788647.0 "0" "738455" "0" "0" "808180" "0" "548289" "816519" "0" "815777" "816126" 790670.0 "759118" "0" "422615" "0" "0" 0.0 "0" "751580" "661868" "142484" "0" "0" "214948" "160750" "0" "0" "0" "0" "0" "0" "0" "816514" "0" "483350" "0" 798249.0 "626238" "816519" "812523" "0" "0" "0" "0" "0" "0" "0" "0" "0" "189904" "397397" "0" "419075" "0" "313332" "806758" "816519" "816519" "816519" "234263" "660596" "815553" "693975" "0" "0" "815402" "812220"
"mean" null null 58.678246 38.513675 55.50653 null null null null null null null null null null null 215.117722 null null null "2018-03-18 07:32:21.772000" "2018-03-20 21:00:43.053000" 2.561357 null null null null null null null null null null null null null null null null null null null 3183.633881 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
"std" null null 21.56475 2.825167 94.605443 null null null null null null null null null null null 86.785808 null null null null null 6.34697 null null null null null null null null null null null null null null null null null null null 712.20168 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
"min" "0" "1" 1.0 17.0 0.0 "201301" "0" "2012" "2013" "A" "1000" "1170" null "1" "1" "1" 0.0 "1" "A010" "A010" "2012-04-23" "2013-01-01" 0.0 "0" "0" "0" "0" "01C031" "01C031" "22" "0" "0" "1" "1" "0" "C" "220" "116" "INNOV1441001H" "0" "0" "201301" 100.0 "1" null "1" "0" "1" "1" "2" "C" "11" "0" "0" "1" "AAFA001, ACQK001, YYYY467, ZZQ… "A022" "A010" ", A169" "00 C, 29 M" "0" "1" null null null "0" "1" "A" "1" "0" "0" "1" "9620001"
"25%" null null 48.0 38.0 0.0 null null null null null null null null null null null 168.0 null null null "2015-10-24" "2015-10-27" 0.0 null null null null null null null null null null null null null null null null null null null 2870.0 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
"50%" null null 63.0 39.0 0.0 null null null null null null null null null null null 263.0 null null null "2018-04-30" "2018-05-03" 0.0 null null null null null null null null null null null null null null null null null null null 3280.0 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
"75%" null null 74.0 40.0 72.0 null null null null null null null null null null null 279.0 null null null "2020-10-02" "2020-10-06" 3.0 null null null null null null null null null null null null null null null null null null null 3630.0 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
"max" "0" "2" 119.0 44.0 365.0 "202211" "2014" "2022" "2022" "C" "9JC04" "99422" null "141901" "2" "2" 999.0 "7" "Z998" "Z992+1" "2022-11-30" "2022-11-30" 466.0 "N" "0" "1" "0" "28Z24Z" "28Z24Z" "9999" "1" "252" "90" "Z" "62" "Z" "227" "121" "INNOV1941008Z" "1" "2" "202211" 6640.0 "U" null "2" "252" "28" "Z" "62" "Z" "11" "9" "2" "2" "ZZQX217, ZZQX162, HZHE002, HHQ… "Z998, Z993" "Z998, T543" "Z992+" "88 P, 04 C" "1" "2" null null null "1" "4" "B" "4" "3" "1" "1" "9632020"

Historique

On peut retracer les types d’actions réalisées avec les fichiers de méta-données json.

from deltalake import DeltaTable as DDT
dt = DDT("/Users/guillaumepressiat/Documents/data/delta/rsa/rsa")
dt.history()
[{'timestamp': 1725830160223,
  'operation': 'DELETE',
  'operationParameters': {'predicate': '["moissor#7581 IN (03,06,09,12)"]'},
  'readVersion': 11,
  'isolationLevel': 'Serializable',
  'isBlindAppend': False,
  'engineInfo': 'Apache-Spark/3.4.3 Delta-Lake/2.4.0',
  'operationMetrics': {'executionTimeMs': '32936',
   'numAddedBytes': '42490353',
   'numAddedChangeFiles': '0',
   'numAddedFiles': '10',
   'numCopiedRows': '816519',
   'numDeletedRows': '426077',
   'numRemovedBytes': '69025074',
   'numRemovedFiles': '10',
   'rewriteTimeMs': '32476',
   'scanTimeMs': '460'},
  'txnId': '35d8d37b-cd5f-4ff3-b780-76f260da9392',
  'version': 12},
 {'timestamp': 1725830121484,
  'operation': 'DELETE',
  'operationParameters': {'predicate': '["(ansor#45 = 2023)"]'},
  'readVersion': 10,
  'isolationLevel': 'Serializable',
  'isBlindAppend': False,
  'engineInfo': 'Apache-Spark/3.4.3 Delta-Lake/2.4.0',
  'txnId': '87d5d0fc-6b9c-49cb-9024-ea03a92d764d',
  'operationMetrics': {'executionTimeMs': '4023',
   'numAddedBytes': '0',
   'numAddedChangeFiles': '0',
   'numAddedFiles': '0',
   'numCopiedRows': '0',
   'numDeletedRows': '143463',
   'numRemovedBytes': '7880925',
   'numRemovedFiles': '1',
   'rewriteTimeMs': '878',
   'scanTimeMs': '3144'},
  'version': 11},
 {'timestamp': 1725830110895,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'clientVersion': 'delta-rs.0.19.1',
  'operationMetrics': {'execution_time_ms': 269,
   'num_added_files': 1,
   'num_added_rows': 143463,
   'num_partitions': 0,
   'num_removed_files': 0},
  'version': 10},
 {'timestamp': 1725830083480,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'clientVersion': 'delta-rs.0.19.1',
  'operationMetrics': {'execution_time_ms': 259,
   'num_added_files': 1,
   'num_added_rows': 141901,
   'num_partitions': 0,
   'num_removed_files': 0},
  'version': 9},
 {'timestamp': 1725830056673,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'operationMetrics': {'execution_time_ms': 258,
   'num_added_files': 1,
   'num_added_rows': 139680,
   'num_partitions': 0,
   'num_removed_files': 0},
  'clientVersion': 'delta-rs.0.19.1',
  'version': 8},
 {'timestamp': 1725830030232,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'clientVersion': 'delta-rs.0.19.1',
  'operationMetrics': {'execution_time_ms': 244,
   'num_added_files': 1,
   'num_added_rows': 128920,
   'num_partitions': 0,
   'num_removed_files': 0},
  'version': 7},
 {'timestamp': 1725830005855,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'operationMetrics': {'execution_time_ms': 238,
   'num_added_files': 1,
   'num_added_rows': 131460,
   'num_partitions': 0,
   'num_removed_files': 0},
  'clientVersion': 'delta-rs.0.19.1',
  'version': 6},
 {'timestamp': 1725829980922,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'clientVersion': 'delta-rs.0.19.1',
  'operationMetrics': {'execution_time_ms': 227,
   'num_added_files': 1,
   'num_added_rows': 128166,
   'num_partitions': 0,
   'num_removed_files': 0},
  'version': 5},
 {'timestamp': 1725829956726,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'clientVersion': 'delta-rs.0.19.1',
  'operationMetrics': {'execution_time_ms': 223,
   'num_added_files': 1,
   'num_added_rows': 126934,
   'num_partitions': 0,
   'num_removed_files': 0},
  'version': 4},
 {'timestamp': 1725829932609,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'clientVersion': 'delta-rs.0.19.1',
  'operationMetrics': {'execution_time_ms': 216,
   'num_added_files': 1,
   'num_added_rows': 119630,
   'num_partitions': 0,
   'num_removed_files': 0},
  'version': 3},
 {'timestamp': 1725829909939,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'clientVersion': 'delta-rs.0.19.1',
  'operationMetrics': {'execution_time_ms': 196,
   'num_added_files': 1,
   'num_added_rows': 111186,
   'num_partitions': 0,
   'num_removed_files': 0},
  'version': 2},
 {'timestamp': 1725829888762,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'clientVersion': 'delta-rs.0.19.1',
  'operationMetrics': {'execution_time_ms': 188,
   'num_added_files': 1,
   'num_added_rows': 108758,
   'num_partitions': 0,
   'num_removed_files': 0},
  'version': 1},
 {'timestamp': 1725829868162,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append'},
  'operationMetrics': {'execution_time_ms': 182,
   'num_added_files': 1,
   'num_added_rows': 105961,
   'num_partitions': 0,
   'num_removed_files': 0},
  'clientVersion': 'delta-rs.0.19.1',
  'version': 0}]
from os import system

system('tree /Users/guillaumepressiat/Documents/data/delta/rsa/rsa')
/Users/guillaumepressiat/Documents/data/delta/rsa/rsa
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   ├── 00000000000000000002.json
│   ├── 00000000000000000003.json
│   ├── 00000000000000000004.json
│   ├── 00000000000000000005.json
│   ├── 00000000000000000006.json
│   ├── 00000000000000000007.json
│   ├── 00000000000000000008.json
│   ├── 00000000000000000009.json
│   ├── 00000000000000000010.json
│   ├── 00000000000000000011.json
│   └── 00000000000000000012.json
├── part-00000-7eb13227-ce82-497f-84fb-e767416c6016-c000.snappy.parquet
├── part-00000-d831c120-2af5-4a49-9599-958e815b00ad-c000.snappy.parquet
├── part-00001-0e6a7d99-4ffc-4775-8318-27178f63e39a-c000.snappy.parquet
├── part-00001-2ac368f0-6f07-4766-a06b-aaa20ad1d6a7-c000.snappy.parquet
├── part-00001-61fa5259-a82e-4256-b0eb-be26c19a1a29-c000.snappy.parquet
├── part-00001-76418848-c673-403d-9a7a-c773c856ee3c-c000.snappy.parquet
├── part-00001-77c542d0-ba31-4fdc-a30c-06c334fc3b5c-c000.snappy.parquet
├── part-00001-7b0eb38f-e0be-4eb4-8138-3dbf30978d4c-c000.snappy.parquet
├── part-00001-80b59950-28a5-4481-8b03-7dcbe88d4dae-c000.snappy.parquet
├── part-00001-9f431d18-d08a-4d2d-8557-e9a979699a90-c000.snappy.parquet
├── part-00001-ab213811-43bd-4a3a-9023-1c2912f038fa-c000.snappy.parquet
├── part-00001-caf7bf87-5b80-4df2-8b1e-fac7b836578a-c000.snappy.parquet
├── part-00001-d62c2a4e-4733-4754-982e-ef3cb1f7cb9b-c000.snappy.parquet
├── part-00001-fb057c18-9f16-4761-8631-27bbae0ae08e-c000.snappy.parquet
├── part-00002-779afa11-e83a-44dd-9c7d-12849d36d9f3-c000.snappy.parquet
├── part-00003-fdd6d234-ab8e-434e-9ab2-7d693451dafd-c000.snappy.parquet
├── part-00004-a206b937-6be0-4679-8a1e-918b49046047-c000.snappy.parquet
├── part-00005-7557bc83-6022-484e-a4e5-f464160d49aa-c000.snappy.parquet
├── part-00006-8c380bc5-f433-4e24-b713-f4ef67cbdf4d-c000.snappy.parquet
├── part-00007-bc244fe1-7a8e-4278-9698-4e3af583f873-c000.snappy.parquet
├── part-00008-b867aaf6-c4e9-417d-9d12-d0f0862c2bab-c000.snappy.parquet
└── part-00009-a60554e9-66b9-465b-8b31-f55ab0e81a4d-c000.snappy.parquet

2 directories, 35 files
0