import pandas as pd
import polars as pl
import pypmsi as pm
import pyspark
from delta import *
from delta.tables import *
from pyspark.sql.functions import *
Intégration des rsa au format delta table avec { pypmsi, polars / spark }
Objectif
L’objectif de ce travail était de tester la faisabilité d’harmoniser les 4 tables de RSA sur l’historique 2013 – 2023 pour les intégrer dans un schéma homogène de données souhaité par Delta Lake (il est souhaité / préférable que le type, l’ordre et le nom des colonnes soient identiques pour faire du APPEND ; c’est en tout cas le cas avec polars
).
- La première partie de ce document se concentre sur cette partie de normalisation des données (au sens où on les force à avoir toutes le même format d’année en année
- La 2ème partie teste l’intégration des données RSA de 2013 M12 à 2023 M12 sous forme de fichiers parquet écrit avec le module polars
- La 3ème partie comprend des tests de requêtage des données avec polars uniquement ; une fonction
remote_table('rsa@rsa')
permet d’accéder aux données via unLazyFrame
polars flêchant vers les fichiers parquet - La dernière partie montre le contenu d’un des fichiers json qui historise les étapes d’alimentations de la table
rsa@rsa
au format Delta
Au cours de ce test, un partitionnement par année-mois a été expérimenté mais les temps de requêtages étaient augmentés de manière importante (* 4, selon les volumes intégrés). L’option de mono-partition semble préférable tant que les volumes de données sont raisonnables.
Cette étape d’intégration en gardant toutes les colonnes des données PMSI sans en exclure aucune pourrait correspondre à l’étape de la couche bronze du lac de données, avec les données brutes associées.
N’ont pas été fait :
- tester la commande SQL
ADD COLUMN
/ALTER TABLE
sur certaines tables si les formats 2024 des données PMSI changeait, et pour les années à suivre. - créer la clé primaire contenant l’année de sortie et le numéro de RSS accolés permettant d’avoir une unicité dans les relations
- tester la même chose sur les RSS, la même donnée contenant le lien avec les passages dans les unités médicales, mais cela peut être adapté de manière très similaire
Intégrations
Paramètres
= pm.noyau_pmsi(finess = 290000017,
p = 2023,
annee = 12,
mois = '/Users/guillaumepressiat/Documents/data/mco')
path
#param_n_max = 1000
= None param_n_max
Fonctions utiles
def normalized_schema():
# Définir un schéma type de données
# On récupère des années antérieures la dispersion des formats pmsi entre 2011 et 2023,
# cela permettra de mettre en cohérence les données sous un seul "schéma" avant de
# les insérer en delta table
= {'rsa' : pl.DataFrame(), 'actes' : pl.DataFrame(), 'diags' : pl.DataFrame(), 'rsa_um' : pl.DataFrame()}
rsa_temp
for i in range(2023, 2012, -1):
# print(i)
= p.irsa(annee = i, mois = 12, typi = 6, n_rows = 1000)
temp
# concat avec how = diagonal ici, on ajoute des colonnes si elles manquent
'rsa'] = pl.concat([rsa_temp['rsa'], temp['rsa']], how="diagonal")
rsa_temp['actes'] = pl.concat([rsa_temp['actes'], temp['actes']], how="diagonal")
rsa_temp['diags'] = pl.concat([rsa_temp['diags'], temp['diags']], how="diagonal")
rsa_temp['rsa_um'] = pl.concat([rsa_temp['rsa_um'], temp['rsa_um']], how="diagonal")
rsa_temp[
= p.irsa(annee = 2023, mois = 12, typi = 6, n_rows = 1000)
temp
# pour aller vite, 2023 est à part car mois = 4
'rsa'] = pl.concat([rsa_temp['rsa'], temp['rsa']], how="diagonal")
rsa_temp['actes'] = pl.concat([rsa_temp['actes'], temp['actes']], how="diagonal")
rsa_temp['diags'] = pl.concat([rsa_temp['diags'], temp['diags']], how="diagonal")
rsa_temp['rsa_um'] = pl.concat([rsa_temp['rsa_um'], temp['rsa_um']], how="diagonal")
rsa_temp[
'rsa'] = rsa_temp['rsa'].join(p.itra(annee = 2023, mois = 12, n_rows = 1000), how = 'left', on = 'cle_rsa')
rsa_temp[
'rsa'] = (rsa_temp['rsa']
rsa_temp['ansor', 'moissor']).alias('periode'),
.with_columns(pl.concat_str(['ansor', 'moissor']).alias('am')))
pl.concat_str([
= dict()
empty 'rsa'] = pl.DataFrame(schema = rsa_temp['rsa'].schema)
empty['actes'] = pl.DataFrame(schema = rsa_temp['actes'].schema)
empty['diags'] = pl.DataFrame(schema = rsa_temp['diags'].schema)
empty['rsa_um'] = pl.DataFrame(schema = rsa_temp['rsa_um'].schema)
empty[
return empty
def normalize_schema_rsa(dfd, table):
# pour appliquer le schéma type de données si il n'est pas respecté
# Sur la base des formats de toutes les années (unions permissives)
# on concatène les rsa d'une année avec ces formats normalisés > on génère des colonnes null
# pour avoir un schéma commun à toutes les années
# how = align ici, et on ordonne les colonnes si elles ne sont pas dans le même ordre
if table != 'rsa':
= (pl.concat([dfd[table],empty[table]], how = 'align')
df sorted(empty[table].columns))
.select('rsa'].select('cle_rsa', 'am', 'periode', 'nas',
.join(dfd['norss', 'ansor', 'moissor'),
= 'cle_rsa', how = 'inner').with_columns(
on == "")
pl.when(pl.col(pl.Utf8) None)
.then(
.otherwise(pl.col(pl.Utf8))
.name.keep()
))else:
= pl.concat([empty[table], dfd['rsa']], how = 'align').with_columns(
df == "")
pl.when(pl.col(pl.Utf8) None)
.then(# keep original value
.otherwise(pl.col(pl.Utf8))
.name.keep()sorted(empty[table].columns))
).select(
return df
# on prépare les rsa d'une année (on les normalise vis-à-vis du schéma unifié
# avec la fonction au dessus
# + aussi ajout du fichier tra et de deux colonnes de temps
def prepare_rsa(p, annee1, mois1, n_max = param_n_max):
# Lire les données et les mettre sous le bon schéma
= p.irsa(typi = 6, annee = annee1, mois = mois1, n_rows = n_max)
rsa = p.itra(annee = annee1, mois = mois1, n_rows = n_max)
tra 'rsa'] = rsa['rsa'].join(tra, on = 'cle_rsa', how = 'inner')
rsa['rsa'] = (rsa['rsa']
rsa['ansor', 'moissor']).alias('periode'),
.with_columns(pl.concat_str(['ansor', 'moissor']).alias('am')))
pl.concat_str([= {k: normalize_schema_rsa(rsa, k) for k, v in rsa.items()}
rsa
return rsa
def one_delta(dfd, table):
# pour écrire une des tables des rsa au format delta
#print(dfd[table].schema)
#print(dfd[table].shape)
dfd[table].write_delta("/Users/guillaumepressiat/Documents/data/delta/rsa/" + table,
#large_dtypes = True,
= "append") #, overwrite_schema = True
mode
def write_rsa_delta(p, annee1, mois1):
# pour lire les fichiers .rsa et tra, normaliser et écrire toutes les tables de rsa en delta
= prepare_rsa(p, annee1, mois1)
temp
for k in ['rsa', 'actes', 'diags', 'rsa_um']:
one_delta(temp, k)
def remote_table(pmsi_table):
# pour faciliter la syntaxe, un wrapper pour accéder aux données LazyFrame
return pl.scan_delta("/Users/guillaumepressiat/Documents/data/delta/" + pmsi_table.replace('@', '/'))
def count_pivot_periode(pmsi_table):
# tri à plat dans les fichiers delta pour observer ce qui a été intégré
= remote_table(pmsi_table)
tempd = (tempd
r 'am', 'ansor', 'moissor')
.group_by(len()
.
.collect()'moissor')
.sort('moissor', values = 'len', index = 'ansor',
.pivot(= 'sum', maintain_order=True)
aggregate_function 'ansor')
.sort(
)return r
def delete_in_delta(pmsi_table, string_delete):
# fonction pour supprimer des données dans delta table (utilise pyspark)
= DeltaTable.forPath(spark, '/Users/guillaumepressiat/Documents/data/delta/' + pmsi_table.replace('@', '/'))
dt_temp dt_temp.delete(string_delete)
Chargement des données
= normalized_schema()
empty # empty['rsa'].schema
# de 2011 M12 à 2022 M12
for i in range(2013, 2024):
# print(i)
12) write_rsa_delta(p, i,
# 2024 M07, année en cours, séparemment
# write_rsa_delta(p, 2024, 7)
# remote_table('rsa@actes').collect().filter(pl.col('ansor').is_null())
Volumétries
%%time
'rsa@rsa') count_pivot_periode(
CPU times: user 134 ms, sys: 29.7 ms, total: 164 ms
Wall time: 53.9 ms
ansor | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
str | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 |
"2013" | 9167 | 8806 | 9540 | 9158 | 8465 | 8679 | 9317 | 8032 | 8305 | 9309 | 8432 | 8751 |
"2014" | 9405 | 8525 | 9006 | 9063 | 8563 | 8986 | 9274 | 8263 | 9450 | 9926 | 8779 | 9518 |
"2015" | 9296 | 8970 | 9763 | 9019 | 8374 | 9561 | 9216 | 8171 | 9493 | 9999 | 9419 | 9905 |
"2016" | 9724 | 9591 | 10515 | 9912 | 9812 | 10216 | 9229 | 9272 | 10269 | 10187 | 10333 | 10567 |
"2017" | 10668 | 9656 | 11052 | 9818 | 10936 | 11431 | 10230 | 10274 | 10662 | 11118 | 10731 | 10361 |
… | … | … | … | … | … | … | … | … | … | … | … | … |
"2019" | 11665 | 10526 | 11687 | 11162 | 10776 | 10660 | 11233 | 9841 | 10745 | 11357 | 10570 | 11238 |
"2020" | 11885 | 10942 | 10220 | 8568 | 9483 | 11289 | 11469 | 10046 | 11786 | 11417 | 10480 | 11335 |
"2021" | 11310 | 10858 | 12453 | 11879 | 11453 | 12084 | 11418 | 10650 | 12091 | 11910 | 11593 | 11981 |
"2022" | 11913 | 11075 | 12889 | 11559 | 12057 | 12120 | 11048 | 11280 | 12257 | 12171 | 11766 | 11766 |
"2023" | 12557 | 11288 | 13248 | 11169 | 12028 | 13015 | 11572 | 11321 | 11643 | 12432 | 11955 | 11235 |
%%time
'rsa@rsa_um') count_pivot_periode(
CPU times: user 141 ms, sys: 26.3 ms, total: 167 ms
Wall time: 54.3 ms
ansor | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
str | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 |
"2013" | 10330 | 9828 | 10647 | 10439 | 9549 | 9711 | 10512 | 8966 | 9341 | 10474 | 9427 | 10017 |
"2014" | 10624 | 9735 | 10185 | 10269 | 9633 | 10124 | 10454 | 9283 | 10494 | 11090 | 9982 | 10797 |
"2015" | 10478 | 10145 | 11098 | 10246 | 9490 | 10877 | 10511 | 9181 | 10668 | 11344 | 10679 | 11236 |
"2016" | 10841 | 10739 | 11880 | 11112 | 11013 | 11448 | 10382 | 10437 | 11431 | 11295 | 11570 | 11891 |
"2017" | 11940 | 10976 | 12389 | 10960 | 12232 | 12629 | 11463 | 11320 | 11811 | 12396 | 11968 | 11636 |
… | … | … | … | … | … | … | … | … | … | … | … | … |
"2019" | 13112 | 11791 | 13112 | 12476 | 12075 | 11918 | 12519 | 10909 | 12017 | 12720 | 11853 | 12524 |
"2020" | 13209 | 12312 | 11564 | 9666 | 10632 | 12422 | 12671 | 11007 | 13037 | 12627 | 11629 | 12613 |
"2021" | 12480 | 12037 | 13841 | 13153 | 12636 | 13353 | 12683 | 11718 | 13282 | 13238 | 12878 | 13286 |
"2022" | 13153 | 12207 | 14204 | 12799 | 13317 | 13451 | 12325 | 12284 | 13474 | 13516 | 13011 | 13148 |
"2023" | 13884 | 12507 | 14612 | 12360 | 13309 | 14284 | 12726 | 12290 | 12801 | 13772 | 13281 | 12432 |
%%time
'rsa@actes') count_pivot_periode(
CPU times: user 416 ms, sys: 75.1 ms, total: 491 ms
Wall time: 143 ms
ansor | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
str | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 |
"2013" | 39450 | 35718 | 40894 | 38241 | 34470 | 36537 | 39552 | 35240 | 34559 | 37183 | 33334 | 36499 |
"2014" | 36827 | 36179 | 36302 | 37509 | 33753 | 33814 | 34192 | 31689 | 32734 | 29792 | 31938 | 35835 |
"2015" | 34416 | 33117 | 37165 | 34225 | 33908 | 35155 | 36629 | 30097 | 35562 | 36297 | 34807 | 36216 |
"2016" | 34992 | 35461 | 39169 | 36779 | 36655 | 36178 | 34188 | 33633 | 36894 | 35207 | 35265 | 36268 |
"2017" | 38756 | 35172 | 38395 | 35454 | 35330 | 37909 | 38250 | 34571 | 36344 | 36986 | 34595 | 36148 |
… | … | … | … | … | … | … | … | … | … | … | … | … |
"2019" | 39926 | 35786 | 40429 | 37587 | 35955 | 36130 | 37412 | 31360 | 37032 | 37933 | 33767 | 35833 |
"2020" | 37193 | 35813 | 34328 | 27193 | 30666 | 35311 | 37232 | 33115 | 37461 | 37052 | 33694 | 34086 |
"2021" | 34487 | 35112 | 41197 | 37287 | 33438 | 36341 | 35283 | 31555 | 34607 | 33530 | 31714 | 34134 |
"2022" | 34516 | 34555 | 38610 | 33976 | 38406 | 37171 | 33459 | 32308 | 36255 | 35948 | 34459 | 36271 |
"2023" | 40215 | 34487 | 39588 | 34359 | 35284 | 38578 | 34549 | 31679 | 34145 | 35845 | 36788 | 33018 |
%%time
'rsa@diags') count_pivot_periode(
CPU times: user 747 ms, sys: 201 ms, total: 948 ms
Wall time: 272 ms
ansor | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
str | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 |
"2013" | 51552 | 48464 | 55267 | 56511 | 50818 | 51976 | 56642 | 48546 | 51040 | 57132 | 50241 | 53880 |
"2014" | 58166 | 53540 | 55703 | 57256 | 52912 | 54255 | 55540 | 50383 | 55682 | 59274 | 52908 | 56873 |
"2015" | 55375 | 55018 | 59977 | 56956 | 50693 | 59464 | 57639 | 50895 | 57099 | 60067 | 56522 | 60209 |
"2016" | 59527 | 59878 | 66432 | 62540 | 60556 | 62283 | 57176 | 57516 | 62489 | 61596 | 62967 | 63929 |
"2017" | 65743 | 61249 | 68527 | 60981 | 66991 | 67664 | 63843 | 62677 | 62332 | 65690 | 65671 | 64233 |
… | … | … | … | … | … | … | … | … | … | … | … | … |
"2019" | 72868 | 67123 | 73669 | 70692 | 65988 | 66030 | 71534 | 61964 | 67261 | 68819 | 62005 | 65833 |
"2020" | 70200 | 64916 | 62578 | 55956 | 59858 | 67101 | 68166 | 60045 | 68677 | 68823 | 62851 | 66577 |
"2021" | 67093 | 65017 | 74707 | 70623 | 67517 | 68375 | 69498 | 64400 | 69719 | 69529 | 67911 | 67933 |
"2022" | 70203 | 65027 | 75626 | 70335 | 72738 | 71725 | 68589 | 65744 | 71751 | 72048 | 67011 | 69758 |
"2023" | 70549 | 66490 | 79108 | 68151 | 73326 | 76715 | 68492 | 66203 | 68642 | 74670 | 73884 | 68997 |
Update (delete de l’année en cours)
On passe à pyspark pour pouvoir lancer des delete dans les fichiers delta lake (il faut installer un java, d’une certaine version, pour que cela fonctionne correctement `
avec quelque chose comme :
export JAVA_HOME=`/usr/libexec/java_home -v 1.8`
from pyspark.sql import SparkSession
#from pyspark import SparkContext
#SparkContext.setSystemProperty('spark.executor.memory', '4g')
= pyspark.sql.SparkSession.builder.appName("MyPmsiDLake") \
builder "spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.executor.memory", "10g") \
.config("spark.executor.cores", 12) \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors","10") \
.config("spark.dynamicAllocation.maxExecutors","15")
.config(
= configure_spark_with_delta_pip(builder).getOrCreate()
spark
# https://stackoverflow.com/a/24657630/10527496
# https://stackoverflow.com/questions/70192980/error-creating-sparkcontext-locally-an-error-occurred-while-calling-none-org-apa
24/09/08 23:15:12 WARN Utils: Your hostname, ......... resolves to a loopback address: 127.0.0.1; using ......... instead (on interface en0)
24/09/08 23:15:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/guillaumepressiat/.ivy2/cache
The jars for the packages stored in: /Users/guillaumepressiat/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-126ee81f-c0c6-45e3-aeef-1cbcde32f6e4;1.0
confs: [default]
found io.delta#delta-core_2.12;2.4.0 in central
found io.delta#delta-storage;2.4.0 in central
found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 80ms :: artifacts dl 3ms
:: modules in use:
io.delta#delta-core_2.12;2.4.0 from central in [default]
io.delta#delta-storage;2.4.0 from central in [default]
org.antlr#antlr4-runtime;4.9.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 0 | 0 | 0 || 3 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-126ee81f-c0c6-45e3-aeef-1cbcde32f6e4
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/2ms)
24/09/08 23:15:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
:: loading settings :: url = jar:file:/opt/homebrew/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
= '2023'
annee_delete = "ansor = \"" + annee_delete + "\""
string_delete
string_delete# string_delete = "ansor is NULL"
'ansor = "2023"'
for i in ['rsa@rsa', 'rsa@actes', 'rsa@diags', 'rsa@rsa_um']:
delete_in_delta(i, string_delete)
24/09/08 23:15:17 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
CodeCache: size=131072Kb used=48351Kb max_used=48384Kb free=82720Kb
bounds [0x0000000106880000, 0x0000000109830000, 0x000000010e880000]
total_blobs=17469 nmethods=16361 adapters=1019
compilation: disabled (not enough contiguous free space left)
2023 a bien été supprimé
%%time
'rsa@rsa') count_pivot_periode(
CPU times: user 133 ms, sys: 50.5 ms, total: 183 ms
Wall time: 60.5 ms
ansor | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
str | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 |
"2013" | 9167 | 8806 | 9540 | 9158 | 8465 | 8679 | 9317 | 8032 | 8305 | 9309 | 8432 | 8751 |
"2014" | 9405 | 8525 | 9006 | 9063 | 8563 | 8986 | 9274 | 8263 | 9450 | 9926 | 8779 | 9518 |
"2015" | 9296 | 8970 | 9763 | 9019 | 8374 | 9561 | 9216 | 8171 | 9493 | 9999 | 9419 | 9905 |
"2016" | 9724 | 9591 | 10515 | 9912 | 9812 | 10216 | 9229 | 9272 | 10269 | 10187 | 10333 | 10567 |
"2017" | 10668 | 9656 | 11052 | 9818 | 10936 | 11431 | 10230 | 10274 | 10662 | 11118 | 10731 | 10361 |
"2018" | 10934 | 10013 | 11135 | 10196 | 10086 | 10838 | 10730 | 10364 | 10529 | 11423 | 10974 | 10944 |
"2019" | 11665 | 10526 | 11687 | 11162 | 10776 | 10660 | 11233 | 9841 | 10745 | 11357 | 10570 | 11238 |
"2020" | 11885 | 10942 | 10220 | 8568 | 9483 | 11289 | 11469 | 10046 | 11786 | 11417 | 10480 | 11335 |
"2021" | 11310 | 10858 | 12453 | 11879 | 11453 | 12084 | 11418 | 10650 | 12091 | 11910 | 11593 | 11981 |
"2022" | 11913 | 11075 | 12889 | 11559 | 12057 | 12120 | 11048 | 11280 | 12257 | 12171 | 11766 | 11766 |
for i in ['rsa@rsa', 'rsa@actes', 'rsa@diags', 'rsa@rsa_um']:
"moissor in ('03', '06', '09', '12')") delete_in_delta(i,
24/09/08 23:15:28 WARN MemoryManager: ..........
%%time
'rsa@rsa') count_pivot_periode(
CPU times: user 113 ms, sys: 22.2 ms, total: 135 ms
Wall time: 60.6 ms
ansor | 01 | 02 | 04 | 05 | 07 | 08 | 10 | 11 |
---|---|---|---|---|---|---|---|---|
str | u32 | u32 | u32 | u32 | u32 | u32 | u32 | u32 |
"2013" | 9167 | 8806 | 9158 | 8465 | 9317 | 8032 | 9309 | 8432 |
"2014" | 9405 | 8525 | 9063 | 8563 | 9274 | 8263 | 9926 | 8779 |
"2015" | 9296 | 8970 | 9019 | 8374 | 9216 | 8171 | 9999 | 9419 |
"2016" | 9724 | 9591 | 9912 | 9812 | 9229 | 9272 | 10187 | 10333 |
"2017" | 10668 | 9656 | 9818 | 10936 | 10230 | 10274 | 11118 | 10731 |
"2018" | 10934 | 10013 | 10196 | 10086 | 10730 | 10364 | 11423 | 10974 |
"2019" | 11665 | 10526 | 11162 | 10776 | 11233 | 9841 | 11357 | 10570 |
"2020" | 11885 | 10942 | 8568 | 9483 | 11469 | 10046 | 11417 | 10480 |
"2021" | 11310 | 10858 | 11879 | 11453 | 11418 | 10650 | 11910 | 11593 |
"2022" | 11913 | 11075 | 11559 | 12057 | 11048 | 11280 | 12171 | 11766 |
vacuum, à faire
Nettoyer régulièrement les fichiers parquet inutiles
Tests
On ne peut pas joindre les tables delta avec polars > on scan / filtre et on collecte les éléments par morceaux.
Par morceaux, on entend : entre parenthèses, on collecte les lignes correspondants à la table rsa@rsa
, puis la table rsa@actes
et on joint ensuite ces deux tables.
Dans notre exemple ici :
- on s’intéresse aux séjours-RSS chirurgicaux (GHM type C), sortis en mai 2020
- qui ont un acte de remplacement de l’articulation hanche ou genou par prothèse totale (actes N.KA), on filtre sur l’activité CCAM 1
- on rajoutera une clause ensuite sur la présence d’un diagnostic S72, fracture du col du fémur en position principale du séjour (position = 1)
# d'abord les rsa concernés par le filtre GHM C sortis en mai 2020
'rsa@rsa')
(remote_table(filter(pl.col('am') == '202005')
.filter(pl.col('rsatype') == 'C')
.
.unique()
.collect() )
# Ensuite les actes correspondants au pattern
'rsa@actes')
(remote_table(filter((pl.col('am') == '202005'))
.filter(pl.col('cdccam').str.contains('N.KA'))
.filter(pl.col('act') == '1')
.
.unique()
.collect() )
# Et la table des diagnostics
'rsa@diags')
(remote_table(filter((pl.col('am') == '202005'))
.filter(pl.col('diag').str.contains('S72'))
.filter(pl.col('position') == 1)
.'cle_rsa')
.select(
.unique()
.collect() )
rsa partie fixe & actes
%%time
# Jointure entre la table des RSA partie fixe et la table des actes
'rsa@rsa')
(remote_table(filter(pl.col('am') == '202005')
.filter(pl.col('rsatype') == 'C')
.
.unique()
.collect()
).join('rsa@actes')
(remote_table(filter((pl.col('am') == '202005'))
.filter(pl.col('cdccam').str.contains('N.KA'))
.filter(pl.col('act') == '1')
.
.unique()
.collect()= 'cle_rsa', how = 'inner'
), on 'nofiness', 'am').agg(pl.col('cle_rsa').n_unique().alias('count')) ).group_by(
CPU times: user 978 ms, sys: 347 ms, total: 1.32 s
Wall time: 463 ms
nofiness | am | count |
---|---|---|
str | str | u32 |
"290000017" | "202005" | 32 |
En alternative, on peut aussi utiliser les colonnes “stream” de la table RSA qui contiennent tous les actes, diags et permettent d’éviter de faire des jointures entre plusieurs tables quand l’information souhaitée est juste (présence ou absence de l’acte, du diag, etc.), c’est ce qui est fait ci-dessous
%%time
# alternative en utilisant la zone "stream" des actes en ligne
'rsa@rsa')
(remote_table(filter(
.'am') == '202005') &
(pl.col('rsatype') == 'C') &
(pl.col('stream_actes').str.contains('N.KA')))
(pl.col(
.unique()
.collect()'nofiness', 'am').agg(pl.col('cle_rsa').n_unique().alias('count')) ).group_by(
CPU times: user 1.05 s, sys: 252 ms, total: 1.3 s
Wall time: 431 ms
nofiness | am | count |
---|---|---|
str | str | u32 |
"290000017" | "202005" | 32 |
rsa partie fixe & actes & diags
On ajoute la clause sur le diagnostic principal S72
%%time
# Jointure entre la table des RSA partie fixe et la table des actes
'rsa@rsa')
(remote_table(filter(pl.col('am') == '202005')
.filter(pl.col('rsatype') == 'C')
.
.unique()
.collect()
).join('rsa@actes')
(remote_table(filter((pl.col('am') == '202005'))
.filter(pl.col('cdccam').str.contains('N.KA'))
.filter(pl.col('act') == '1')
.
.unique()
.collect()= 'cle_rsa', how = 'inner'
), on
).join('rsa@diags')
(remote_table(filter((pl.col('am') == '202005'))
.filter(pl.col('diag').str.contains('S72'))
.filter(pl.col('position') == 1)
.'cle_rsa')
.select(
.unique()
.collect()= 'cle_rsa', how = 'inner'
), on 'nofiness', 'am').agg(pl.col('cle_rsa').n_unique().alias('count')) ).group_by(
CPU times: user 1.27 s, sys: 179 ms, total: 1.44 s
Wall time: 507 ms
nofiness | am | count |
---|---|---|
str | str | u32 |
"290000017" | "202005" | 14 |
%%time
# alternative en utilisant la zone "stream" des actes en ligne, et la colonne dp
'rsa@rsa')
(remote_table(filter(
.'am') == '202005') &
(pl.col('rsatype') == 'C') &
(pl.col('stream_actes').str.contains('N.KA')) &
(pl.col('dp').str.contains('S72')))
(pl.col(
.unique()
.collect()'nofiness', 'am').len() ).group_by(
CPU times: user 1.04 s, sys: 148 ms, total: 1.18 s
Wall time: 389 ms
nofiness | am | len |
---|---|---|
str | str | u32 |
"290000017" | "202005" | 14 |
Nombre de lignes / colonnes des tables
Après avoir enlevé 2023 M04
'rsa@rsa').collect().shape remote_table(
(816519, 108)
'rsa@actes').collect().shape remote_table(
(2818046, 19)
'rsa@diags').collect().shape remote_table(
(4972199, 10)
'rsa@rsa_um').collect().shape remote_table(
(912675, 23)
Description des rsa
Avec la fonction describle
de polars
on affiche des statistiques surles données rsa.
Le choix a été fait d’ordonner les colonnes par ordre alphabétique.
'rsa@rsa').collect().describe() remote_table(
statistic | admission_maison_naissance | adnp75 | agean | agegest | agejr | am | anivgprec | anseqta | ansor | cat_nb_intervenants | cdgeo | cdpostal | cdpu | cle_rsa | confcdsej | conversion_hc | delaireg | dest | dp | dr | dtent | dtsort | duree | echpmsi | ell_forf_diabete | ell_gradation | genautorsa | ghm | ghm1 | ghshorsinno | ghsminore | gpcdretr | gpcmd | gpcompx | gpnum | gptype | … | novrsa | novrss | numinno | paslitsp | pc_raac | periode | poids | prov | rescrit_tarifaire | resererve_hosp | rsacdretr | rsacmd | rsacompx | rsanum | rsatype | rsavclass | schpmsi | sejinfbi | sexe | stream_actes | stream_das | stream_dpum | stream_drum | stream_um | suppdefcard | surveillance_particuliere | topadmnais | topctc | topradalimta | topradavastin | typedosim | typesej | typmachradio | typrestpo | uhcd | valvaort | zrdth |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
str | str | str | f64 | f64 | f64 | str | str | str | str | str | str | str | str | str | str | str | f64 | str | str | str | str | str | f64 | str | str | str | str | str | str | str | str | str | str | str | str | str | … | str | str | str | str | str | str | f64 | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str | str |
"count" | "268230" | "345" | 788647.0 | 31919.0 | 27872.0 | "816519" | "78064" | "816519" | "816519" | "8339" | "816519" | "268230" | "0" | "816519" | "742" | "393" | 25849.0 | "57401" | "816519" | "393904" | "816519" | "816519" | 816519.0 | "816519" | "64939" | "154651" | "674035" | "816519" | "816519" | "601571" | "655769" | "816519" | "816519" | "816519" | "816519" | "816519" | … | "816519" | "816519" | "5" | "816519" | "333169" | "816519" | 18270.0 | "190281" | "0" | "3996" | "816519" | "816519" | "816519" | "816519" | "816519" | "816519" | "816519" | "816519" | "816519" | "626615" | "419122" | "816519" | "397444" | "816519" | "503187" | "9761" | "0" | "0" | "0" | "582256" | "155923" | "966" | "122544" | "816519" | "816519" | "1117" | "4299" |
"null_count" | "548289" | "816174" | 27872.0 | 784600.0 | 788647.0 | "0" | "738455" | "0" | "0" | "808180" | "0" | "548289" | "816519" | "0" | "815777" | "816126" | 790670.0 | "759118" | "0" | "422615" | "0" | "0" | 0.0 | "0" | "751580" | "661868" | "142484" | "0" | "0" | "214948" | "160750" | "0" | "0" | "0" | "0" | "0" | … | "0" | "0" | "816514" | "0" | "483350" | "0" | 798249.0 | "626238" | "816519" | "812523" | "0" | "0" | "0" | "0" | "0" | "0" | "0" | "0" | "0" | "189904" | "397397" | "0" | "419075" | "0" | "313332" | "806758" | "816519" | "816519" | "816519" | "234263" | "660596" | "815553" | "693975" | "0" | "0" | "815402" | "812220" |
"mean" | null | null | 58.678246 | 38.513675 | 55.50653 | null | null | null | null | null | null | null | null | null | null | null | 215.117722 | null | null | null | "2018-03-18 07:32:21.772000" | "2018-03-20 21:00:43.053000" | 2.561357 | null | null | null | null | null | null | null | null | null | null | null | null | null | … | null | null | null | null | null | null | 3183.633881 | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null |
"std" | null | null | 21.56475 | 2.825167 | 94.605443 | null | null | null | null | null | null | null | null | null | null | null | 86.785808 | null | null | null | null | null | 6.34697 | null | null | null | null | null | null | null | null | null | null | null | null | null | … | null | null | null | null | null | null | 712.20168 | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null |
"min" | "0" | "1" | 1.0 | 17.0 | 0.0 | "201301" | "0" | "2012" | "2013" | "A" | "1000" | "1170" | null | "1" | "1" | "1" | 0.0 | "1" | "A010" | "A010" | "2012-04-23" | "2013-01-01" | 0.0 | "0" | "0" | "0" | "0" | "01C031" | "01C031" | "22" | "0" | "0" | "1" | "1" | "0" | "C" | … | "220" | "116" | "INNOV1441001H" | "0" | "0" | "201301" | 100.0 | "1" | null | "1" | "0" | "1" | "1" | "2" | "C" | "11" | "0" | "0" | "1" | "AAFA001, ACQK001, YYYY467, ZZQ… | "A022" | "A010" | ", A169" | "00 C, 29 M" | "0" | "1" | null | null | null | "0" | "1" | "A" | "1" | "0" | "0" | "1" | "9620001" |
"25%" | null | null | 48.0 | 38.0 | 0.0 | null | null | null | null | null | null | null | null | null | null | null | 168.0 | null | null | null | "2015-10-24" | "2015-10-27" | 0.0 | null | null | null | null | null | null | null | null | null | null | null | null | null | … | null | null | null | null | null | null | 2870.0 | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null |
"50%" | null | null | 63.0 | 39.0 | 0.0 | null | null | null | null | null | null | null | null | null | null | null | 263.0 | null | null | null | "2018-04-30" | "2018-05-03" | 0.0 | null | null | null | null | null | null | null | null | null | null | null | null | null | … | null | null | null | null | null | null | 3280.0 | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null |
"75%" | null | null | 74.0 | 40.0 | 72.0 | null | null | null | null | null | null | null | null | null | null | null | 279.0 | null | null | null | "2020-10-02" | "2020-10-06" | 3.0 | null | null | null | null | null | null | null | null | null | null | null | null | null | … | null | null | null | null | null | null | 3630.0 | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null |
"max" | "0" | "2" | 119.0 | 44.0 | 365.0 | "202211" | "2014" | "2022" | "2022" | "C" | "9JC04" | "99422" | null | "141901" | "2" | "2" | 999.0 | "7" | "Z998" | "Z992+1" | "2022-11-30" | "2022-11-30" | 466.0 | "N" | "0" | "1" | "0" | "28Z24Z" | "28Z24Z" | "9999" | "1" | "252" | "90" | "Z" | "62" | "Z" | … | "227" | "121" | "INNOV1941008Z" | "1" | "2" | "202211" | 6640.0 | "U" | null | "2" | "252" | "28" | "Z" | "62" | "Z" | "11" | "9" | "2" | "2" | "ZZQX217, ZZQX162, HZHE002, HHQ… | "Z998, Z993" | "Z998, T543" | "Z992+" | "88 P, 04 C" | "1" | "2" | null | null | null | "1" | "4" | "B" | "4" | "3" | "1" | "1" | "9632020" |
Historique
On peut retracer les types d’actions réalisées avec les fichiers de méta-données json.
from deltalake import DeltaTable as DDT
= DDT("/Users/guillaumepressiat/Documents/data/delta/rsa/rsa")
dt dt.history()
[{'timestamp': 1725830160223,
'operation': 'DELETE',
'operationParameters': {'predicate': '["moissor#7581 IN (03,06,09,12)"]'},
'readVersion': 11,
'isolationLevel': 'Serializable',
'isBlindAppend': False,
'engineInfo': 'Apache-Spark/3.4.3 Delta-Lake/2.4.0',
'operationMetrics': {'executionTimeMs': '32936',
'numAddedBytes': '42490353',
'numAddedChangeFiles': '0',
'numAddedFiles': '10',
'numCopiedRows': '816519',
'numDeletedRows': '426077',
'numRemovedBytes': '69025074',
'numRemovedFiles': '10',
'rewriteTimeMs': '32476',
'scanTimeMs': '460'},
'txnId': '35d8d37b-cd5f-4ff3-b780-76f260da9392',
'version': 12},
{'timestamp': 1725830121484,
'operation': 'DELETE',
'operationParameters': {'predicate': '["(ansor#45 = 2023)"]'},
'readVersion': 10,
'isolationLevel': 'Serializable',
'isBlindAppend': False,
'engineInfo': 'Apache-Spark/3.4.3 Delta-Lake/2.4.0',
'txnId': '87d5d0fc-6b9c-49cb-9024-ea03a92d764d',
'operationMetrics': {'executionTimeMs': '4023',
'numAddedBytes': '0',
'numAddedChangeFiles': '0',
'numAddedFiles': '0',
'numCopiedRows': '0',
'numDeletedRows': '143463',
'numRemovedBytes': '7880925',
'numRemovedFiles': '1',
'rewriteTimeMs': '878',
'scanTimeMs': '3144'},
'version': 11},
{'timestamp': 1725830110895,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'clientVersion': 'delta-rs.0.19.1',
'operationMetrics': {'execution_time_ms': 269,
'num_added_files': 1,
'num_added_rows': 143463,
'num_partitions': 0,
'num_removed_files': 0},
'version': 10},
{'timestamp': 1725830083480,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'clientVersion': 'delta-rs.0.19.1',
'operationMetrics': {'execution_time_ms': 259,
'num_added_files': 1,
'num_added_rows': 141901,
'num_partitions': 0,
'num_removed_files': 0},
'version': 9},
{'timestamp': 1725830056673,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'operationMetrics': {'execution_time_ms': 258,
'num_added_files': 1,
'num_added_rows': 139680,
'num_partitions': 0,
'num_removed_files': 0},
'clientVersion': 'delta-rs.0.19.1',
'version': 8},
{'timestamp': 1725830030232,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'clientVersion': 'delta-rs.0.19.1',
'operationMetrics': {'execution_time_ms': 244,
'num_added_files': 1,
'num_added_rows': 128920,
'num_partitions': 0,
'num_removed_files': 0},
'version': 7},
{'timestamp': 1725830005855,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'operationMetrics': {'execution_time_ms': 238,
'num_added_files': 1,
'num_added_rows': 131460,
'num_partitions': 0,
'num_removed_files': 0},
'clientVersion': 'delta-rs.0.19.1',
'version': 6},
{'timestamp': 1725829980922,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'clientVersion': 'delta-rs.0.19.1',
'operationMetrics': {'execution_time_ms': 227,
'num_added_files': 1,
'num_added_rows': 128166,
'num_partitions': 0,
'num_removed_files': 0},
'version': 5},
{'timestamp': 1725829956726,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'clientVersion': 'delta-rs.0.19.1',
'operationMetrics': {'execution_time_ms': 223,
'num_added_files': 1,
'num_added_rows': 126934,
'num_partitions': 0,
'num_removed_files': 0},
'version': 4},
{'timestamp': 1725829932609,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'clientVersion': 'delta-rs.0.19.1',
'operationMetrics': {'execution_time_ms': 216,
'num_added_files': 1,
'num_added_rows': 119630,
'num_partitions': 0,
'num_removed_files': 0},
'version': 3},
{'timestamp': 1725829909939,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'clientVersion': 'delta-rs.0.19.1',
'operationMetrics': {'execution_time_ms': 196,
'num_added_files': 1,
'num_added_rows': 111186,
'num_partitions': 0,
'num_removed_files': 0},
'version': 2},
{'timestamp': 1725829888762,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'clientVersion': 'delta-rs.0.19.1',
'operationMetrics': {'execution_time_ms': 188,
'num_added_files': 1,
'num_added_rows': 108758,
'num_partitions': 0,
'num_removed_files': 0},
'version': 1},
{'timestamp': 1725829868162,
'operation': 'WRITE',
'operationParameters': {'mode': 'Append'},
'operationMetrics': {'execution_time_ms': 182,
'num_added_files': 1,
'num_added_rows': 105961,
'num_partitions': 0,
'num_removed_files': 0},
'clientVersion': 'delta-rs.0.19.1',
'version': 0}]
from os import system
'tree /Users/guillaumepressiat/Documents/data/delta/rsa/rsa') system(
/Users/guillaumepressiat/Documents/data/delta/rsa/rsa
├── _delta_log
│ ├── 00000000000000000000.json
│ ├── 00000000000000000001.json
│ ├── 00000000000000000002.json
│ ├── 00000000000000000003.json
│ ├── 00000000000000000004.json
│ ├── 00000000000000000005.json
│ ├── 00000000000000000006.json
│ ├── 00000000000000000007.json
│ ├── 00000000000000000008.json
│ ├── 00000000000000000009.json
│ ├── 00000000000000000010.json
│ ├── 00000000000000000011.json
│ └── 00000000000000000012.json
├── part-00000-7eb13227-ce82-497f-84fb-e767416c6016-c000.snappy.parquet
├── part-00000-d831c120-2af5-4a49-9599-958e815b00ad-c000.snappy.parquet
├── part-00001-0e6a7d99-4ffc-4775-8318-27178f63e39a-c000.snappy.parquet
├── part-00001-2ac368f0-6f07-4766-a06b-aaa20ad1d6a7-c000.snappy.parquet
├── part-00001-61fa5259-a82e-4256-b0eb-be26c19a1a29-c000.snappy.parquet
├── part-00001-76418848-c673-403d-9a7a-c773c856ee3c-c000.snappy.parquet
├── part-00001-77c542d0-ba31-4fdc-a30c-06c334fc3b5c-c000.snappy.parquet
├── part-00001-7b0eb38f-e0be-4eb4-8138-3dbf30978d4c-c000.snappy.parquet
├── part-00001-80b59950-28a5-4481-8b03-7dcbe88d4dae-c000.snappy.parquet
├── part-00001-9f431d18-d08a-4d2d-8557-e9a979699a90-c000.snappy.parquet
├── part-00001-ab213811-43bd-4a3a-9023-1c2912f038fa-c000.snappy.parquet
├── part-00001-caf7bf87-5b80-4df2-8b1e-fac7b836578a-c000.snappy.parquet
├── part-00001-d62c2a4e-4733-4754-982e-ef3cb1f7cb9b-c000.snappy.parquet
├── part-00001-fb057c18-9f16-4761-8631-27bbae0ae08e-c000.snappy.parquet
├── part-00002-779afa11-e83a-44dd-9c7d-12849d36d9f3-c000.snappy.parquet
├── part-00003-fdd6d234-ab8e-434e-9ab2-7d693451dafd-c000.snappy.parquet
├── part-00004-a206b937-6be0-4679-8a1e-918b49046047-c000.snappy.parquet
├── part-00005-7557bc83-6022-484e-a4e5-f464160d49aa-c000.snappy.parquet
├── part-00006-8c380bc5-f433-4e24-b713-f4ef67cbdf4d-c000.snappy.parquet
├── part-00007-bc244fe1-7a8e-4278-9698-4e3af583f873-c000.snappy.parquet
├── part-00008-b867aaf6-c4e9-417d-9d12-d0f0862c2bab-c000.snappy.parquet
└── part-00009-a60554e9-66b9-465b-8b31-f55ab0e81a4d-c000.snappy.parquet
2 directories, 35 files
0