The Parquet Format and Performance Optimization Opportunities Boudewijn Braams (Databricks)

163.77k views6902 WordsCopy TextShare
Databricks
The Parquet format is one of the most widely used columnar storage formats in the Spark ecosystem. G...
Video Transcript:
Thanks hey everybody so my name is barb and brahms I'm a software engineer at data bricks I'm part of the storage and i/o team and today I'm gonna be talking to you about the park' format and performance optimization opportunities that it presents so before we dive into the specifics let's just take a step back and look at what we do when we do data processing and analytics so we start off with a data source be it a text file a CSV file XML or JSON okay whatever you feed it into a data processing or querying
engine like spark and what you get in the end is like new insights transformed data in the case of an ETL pipeline or whatever there's a lot of complexity in the data processing and querying engine it won't be going into that that much except maybe for the part of efficiently reading in the data but will mostly be focusing on the data sources of course and it's specifically on park' a quick overview of of the presentation I'm going to be talking about data storage models in general the pirkei format of course and the optimization opportunities that
represents so let's take a broader look at data sources and formats first so broadly speaking you have this this spectrum right whereby you have unstructured data on the one hand which gives you a lot of flexibility but it's it's a really high parsing overhead and on the other hand you have structured formats which are more efficient storage wise but also offer better performance and like let's let's give a description of these of these three categories so you have the unstructured data these are stuff like text files artists like raw texts not actually CSV it should
not be in that category but it is for some reason but it's in essence they're known there's no notion of a record or there's no well-defined schema so it's like it gives you a lot of flexibility like it's it's yeah it's super flexible but it's really hard to parse there's gonna be a huge overhead there semi-structured data is a step in the right direction in this case these are things like xml files or csv files or json files in this case you do not necessarily have a well-defined schema but what you do have is a
notion of a record or a row which is kind of nice and it improves the the ease of parsing and this way speeds up your queries on the other end we have structured data formats which is the most nice to work with there they're the fastest they have the least parsing overhead most efficient storage you have both a notion of records and rows and a well defined schema what makes it performant is that of course are very predictable locations for where each data element is going to be and park' is one of these structured data
formats so how do we actually store a structured data format on disk at the logical level you always have this notion of a table right you have a rose and a row has columns so how do you actually store this on disk there's basically three ways in which this is done there's the row wise model in which what you do it's it's basically a one-to-one mapping of your your logical model to the to the physical model where you buy you just store the column values of each row back-to-back you have the columnar model where what
you do instead is you take all the column values of the first column store these back-to-back and do this for the second column and for the third column and so forth and there's of course there's also a hybrid model which kind of combines these two two models where you have both horizontal partitioning of rows and within that horizontal partition you do vertical partitioning but to analyze these dren lies the performance of these different models you have to of course recognize that there's different workloads and that they're gonna behave differently under different workloads so broadly speaking
there's two types of workloads there's the traditional OLTP workload which is online transaction processing and what this is is they're their workloads with lots of small operations evolving entire roads like for example lots of inserts or lots of deletes or update operations well the essence here like what I'm trying to convey to you is that these operations evolve entire rows o LEP is kind of our domain like it's it's it's stands for online analytical processing and the queries are very different like they are usually only a few very large queries operating on a subset of
all the columns so not the entire rows usually they're very large scans and then the end result is a very small selection of the rows and within that small selection of the rows a small subset of the columns the main assumption I have for the rest of this discussion is that i/o is expensive like be it from memory or from disk or from the network so let's look at these these models in terms of in the context of these different types of workloads so first off you have the the row wise model like I said
it's it's it's the most intuitive one it's basically a one-to-one mapping of your of your logical model to the physical model so if you have a roll here with three columns you map it on disk like this it's very straightforward it's very well-suited for all TP as you can imagine because you have let's say you have an insert operation like what you can do is just append all these column values to the end of your file if you have an update operation you just find the location and you updated the column values in place there
if you have a delete you just look it up and you delete the values there it's not so good for all ap on the other hand because like I said you only interested in a subset of the columns and this this model works on the entire is on the basis of entire rows so you're gonna be wasting i/o reading column values that you're not gonna need and as a as we've assumed IO is expensive so like that's that's not good yeah so basically what we have is horizontal partitioning so our partitioning on the on the
row level sorry the columnar model I've explained a bit so instead of storing all the column values of a single back-to-back what you do is you store all the values of all the columns for all rows back-to-back so in this case if we have a row like this with the column that is a 0 B 0 and C 0 they would end up here here and here and this this is vertical partitioning so instead of doing the horizontal partitioning on the row base you do it on a on the notion of the columns this is
not very well suited for all TP way to think about it because if you're gonna have to insert a record you're gonna have to insert these column values at various different locations that which are going to be spread out in memory and if you have a big big data set or a big big file it's going to be kind of inefficient you're gonna have a fragmented memory access pattern you're gonna have to like update all these values there or insert values there it's computers don't really like fragmented memory access patterns right for only P on
the other hand this is this is this is good because like we've said we're only interested in the subset of the columns so if you're only gonna want to read a or B you're only gonna read column a or b essentially what you get is free projection pushdown which is a fancy term for column pruning which is just a fancy term for only reading the columns that you're interested in also a nice thing about this is that since their values you're gonna store back to back are from these same value domain it presents the ice
compression opportunities and you can do stuff like run length encoding which which I'll get into later so let's compare the two models in the context of an ol AP query let's say you have a query that's interested in column a in column C and just those and you would like to access it with the row wise model what you would end up with is this memory access pattern which kind of sucks right it's it's very fragmented and computers don't really like fragmented memory access patterns like even I considering modern memory subsystems you're not gonna be
able to if you're if your value is small enough you're not gonna be able to read just that value like if you're gonna be reading from memory you're only you're always going to be reading at the join you'll arity of cache lines which is 64 bytes and if you're gonna be reading from disk it's going to be an operating system sized page which is four kilobytes and if you're going to be reading from from the network it's gonna be some other some other granularity if we look at the color model on the other hand we
get this nice sequential memory access pattern and it turns out a computer's really like sequential memory access patterns you can do stuff like cache prefetching or prefetching from this you're not gonna be wasting i/o on reading columns column values that you're not gonna need anyway so all good right not really like if you want to do row reconstruction like you still want to be able to get rows in the end the columnar model might not be as optimal as you'd like because if you're let's imagine you have a hundred gigabyte a data set and you
have ten columns and you want to reconstruct the rogue are part of a row consisting of the first the third and the last column you're gonna have to read the first ten gigabytes to get the first value and then the third hanging you buy some than the last time gigabytes it's gonna be very all over the place and you're gonna lose the nice locality properties and the modern computers and modern memory subsystems are kind of built around making things fast because of locality and caching so what's the answer you probably have guessed it it's the
it's the hybrid model so in this case you will store your row your logical row like this I will end up here so what we do is we apply both horizontal partitioning of on the rows so in this case we do horizontal partitioning every three rows and within those three rows you do the you apply the calendar model this is used by parquet and other corner formats such as or C and basically gives you the best of both worlds in the sense that you get the nice locality properties of the the row wise model and
the nice I Oh properties of the columnar mod so we finally get to parquet quickly it was initially an effort by Twitter and cloud era open stores storage format uses the hybrid model in the scientific literature this referred to px or partitions attributes across model it's widely used in the SPARC and Hadoop ecosystem and one of the primary formats used by data books customers so what does the parking file look like on disk is that actually a single file the logical file is defined by a root directory and this root directory contains one or multiple
files directly or it contains a subdirectory structure defined by a partitioning scheme which I'll get into later but you can end up with something like this so how is data actually organized within such a pocke file like I said you have both horizontal partitioning and vertical partitioning the horizontal partitions in pork' are called row groups you can have multiple row groups in a single file and the default size for those is 128 megabytes and within those row groups you store you apply your you do your vote or your vertical partitioning or you store your your
columns or data in a columnar fashion and so-called column chunks and within the column chunks you store the actual data in so called data pages and the data pages have things like metadata such as the minimum value the maximum value the number of values in that page reputation levels and definition levels which I won't get into but it's basically a means to reconstruct nested schemas and you have your encoded values themselves one thing to note is that you also have metadata at the row group level but these are stored in the in the footer and
I'll get back to why that is nice to have all the metadata in one single location later so how do you actually store your values deeper case specification features around 6 and coding schemes but most of them are in the the v2 spec which best to the best of my knowledge is not completely finished or at least not really used in production yet but someone might correct me on that later but the main ones you're gonna be with for the v1 speck is played in Coney and dictionary encoding so planar coding is what it sounds
like it's plain so you if you have fixed with values you're gonna be they're gonna be stored back to back so if you have integers you're just going to throw them back to back if you have non fixed with data the length is prefixed to the data element to denote when you can when you can stop reading what things get interesting is with hourly dictionary encoding which is the combination which combines several techniques like run length encoding bit packing and dictionary compression as based on the assumption that in your data you're going to be you're
gonna have duplicate and repeated values I think this is where you think about it's a fairly valid assumption right it's the imagine you have a data set of 10 billion rows and you have a country column there's only so many countries in the world so you're gonna be repeating those values and they might actually so you're gonna have duplicate values it might actually also be repeating those values so quick example of the the counter example that I just gave if you have your uncompressed data like this there's several values and there duplet there's duplicates and
there's repeated values and so you have the United States you see it's it's a it's the last element as well the Netherlands is repeated and France has a duplicate somewhere else so the first thing like the porque encoder does when it tries to encode this data is to keep track of all the unique values that it encounters and then you end up with the dictionary so in this case you have four unique values in your original data set a United States France Germany and the Netherlands and basically what this gives is you could sort of
conceptually see it as an array with at the index being like the dictionary code which you can use to encode your original data which we've done in this example so instead of storing the values explicitly you reference the the dictionary element what it does then is it applies run-length encoding which is a technique whereby instead of storing repeated values explicitly what you do is you throw the repeated value and the number of times it's repeated so in this case the Netherlands has repeated three times and the dictionary code three is repeated three times so instead
of storing that explicitly and wasting space on the repeated values you store the value that's repeated three and the number of times it's repeated the last thing that is done is a bit packing which is a nifty little technique whereby you only use the maximum number of bits you're gonna need to store the actual values so like I said you could like conceptually treated as an array with the array index being the dictionary code like you can store this in a 32-bit integer for example but if you look at this dictionary it only has four
unique values and how many bits do you need to store the number zero one two and three at max it's gonna be a max two bits right so why not just use two bits to store all these values instead of the full 32 I mean you can see that if you have a relatively small dictionary this can lead to some significant space savings so finally he gets it some some optimizations first thing to do is just accommodate for Dictionary encoding I mean if you have smaller files it means less AO and we've assumed that it
was expensive so your query is likely going to be faster there's a thing to note here though that you only have a single dictionary per column chunk which was the vertical partition in the horizontal partition what is called a row group so y parky is encoding your data it's gonna keep track of the dictionary and it's going to reach a limit there's a size limit to that due to the dictionary and if it's gone so what happens if a dictionary gets too big Parkay is gonna fall back to plain so your dictionary is filled up
and the rest of the data is just gonna be encoded using the plain encoding format and it's gonna miss all the compression opportunities and I guess not good because if I was gonna get bigger so what do you do to combat this there's basically two things you can do very easy things you can increase the dictionary sighs it's a configurable parameter as this park humph poke a dictionary page size the other thing you can do is just decrease the row group size this makes sense right because if you decrease the Eurogroup size you're going to
decrease the number of rows which means you're going to decrease the number of values in the column which in turn increases the probability of those values all the unique values in that subset fitting in the dictionary how do you actually inspect this whether this has any effect if you really want to get down to it you can use parkade tools which can be found in the open source became our repo you can just compile it and run it on pakka files and you can dump stuff like the actual data or metadata about the perk a
file so in this case I ran it on a file that I have it shows me that I have a row group 0 with a column ID and within that column I have all these all these data pages in this case is not actually dictionary encoded as you can see it's there's VLE in the middle and that's the value encoding and it says plain but that's because this is a primary index column so all the values are unique so it doesn't make any sense to throw them a dictionary encode them so another thing you can
do Arlette pocket allows for is page compression on entire data pages so you can outright compress entire data pages using a variety of compression schemes such as snappy or gzip or l0 there's a spark humph you can use to to set this but it only of course makes sense if the savings that you have on Io by having a smaller file outweigh the cost of having to decompress the data it turns out that that actually is the case for lightweight compression schemes such as snappy I run a simple benchmark here simple thing you get by
data set I put it on as three and then just do a spark read ok collect on the entire thing and when you have the data in the uncompressed form it takes about like 14 seconds to read and if you have the exact same data set compressed using snappy and it's six gear by the actual query just reading the data the exact same data takes around eight or nine seconds which is quite a significant difference right it's practically always enabled by default but it's still good to be to be aware of so let's look at
some more interesting optimization techniques let's say you have a simple query like this so a select star from your table where the column X is greater than five remember I talked about the the min max statistics in the for the row groups that we have they're all stored in the footer of the park' file so if you that's one of the first things SPARC does RM Park ADA's when reading a perk a file is you read the footer and then you have all this metadata at your disposal and what happens then is then you can
reference this you can leverage this metadata to do row group skipping so let's say you have all this metadata for for example therefore these three row groups and you'll have your predicate X is greater than 5 if you look at the the min and Max values of the row of the first row Gopi C we have a min value of 0 and a max value of 9 so you basically it's guaranteed that there's gonna be a value that satisfy your predicate in that row group like there's at least one value that has the value 9
which means you're gonna have to read the row group but if we look at the third row group we see that we have a min value of 1 and the max value of 4 which basically means that we do not have to read the row go because there's no way there's gonna be value in there that's going to satisfy our predicate and as you might remember the row groups can be 128 megabytes in size so skipping 128 megabytes is a good thing yeah you can enable this using this spark config which allows which will tell
SPARC to push down the predicates that it receives into the park a reader it's also practically an a always enabled by default but still it's it's a good thing to be aware of however doesn't always work like it doesn't work well on unsorted data if you have a large value range so you have a very low minimum value in a very high maximum value then the chances are that the predicate that you have is going to be for some value in the middle of that right so what'd he do to to combat that one of
the things you can do is to pre sort your data on the on the vertical column so you get more narrow min Max ranges another thing to be aware of is to always use tie predicates so to match the predicate that you supply and spark and the actual column type so don't rely on any internal casting or conversions because they unfortunately do not always get it right so for example use actual lungs in your predicate if you have a long column so do not use an integer in your predicate so what if we have what
about an equality predicate let's look at the same example that we had before but then we do an x equals 5 so if you look at the metadata for example in the first case where the minimum value 0 and the maximum value 9 so as the value of 5 in there I mean could be right doesn't have to be it could be all zeros and 1 9 it would still produce the same in mac statistics so we're still gonna have to read that row group but one nice nice thing that pirkei has is a thing
called dictionary filtering so if you remember the dictionary is a collection of all the unique values that you have in that column chunk and it's like it's stored and in the beginning of the column chunk so what you can do instead of reading the whole thing is you reference the dictionary see is there one value that is equal to 5 in there and if there's not you can like with 100% guarantee skip that row group because there's not going to be a very dense dictionary coded with a reference to the value file so that's good
we can do doom or skip I think this is a very very cool feature it can be enabled using the following park' coughing oh thing that we can do is like a more high-level form of predicate pushdown whereby you embed the predicates that you have into the actual directory structure of the file this is very useful if you know the predicates of your workload before I guess say you always have a query and a pair you have a query that always has a predicate on the date column what you can do is you can do
a partition by on the date column of your data frame and then what partying spark does is will generate this sub directory structure for you so you have these sub directories within your logical perc a directory with the specific values for that column and then all the files in that specific subdirectory will have those values for that column so essentially what you're doing is you're translating predicate evaluation into just figuring out which files you should read and this works well like I said if you know the the predicate beforehand but it can also be problematic
like if your workload changes or if your predicates change then this can work against you because you might end up with having to read more files then you would have if you had not applied this partitioning scheme which brings me to my next point which is like sort of the root of all evil basically which is having many small files so remember that for every every file you have to set up internal data structures you have to instantiate the reader objects you have to fetch the file you have to parse it like this it's a
lot of overhead and to demonstrate this I ran another simple simple benchmark so the same 10 gigabyte data set on stored on s3 and you just read it in nothing special but if you have the data set in 16 files we see that it takes about 12 and a half seconds to read the thing with VF if we have the exact same data set in a thousand 24 files we see it suddenly takes almost 19 seconds like this is this is a significant difference like this is something everybody should be aware of that if you
have a lot of small files this overhead is gonna I can play a big part I have a significant contribution to your total total query time so what can do to to combat this well basically there's manual compaction right you can just take your data frame and you can repartition it you can do that in these ways you can call a report ition directly which does a shuffle and then you with files that are of the same size spread over the number of partitions that you provided as a parameter or you can use coalesce which
there's pretty much the same thing but it avoids the shuffle which as a result you might end up with files that are unequal in size but it's still going to have the provided number of partitions a thing to note with this problem is that you should always watch out for incremental work or output let's say you have a van ETL job that runs every hour and pence data to a table and per carry what this basically means is that you're gonna be writing a new file and you can imagine if this runs every hour you
let it run for weeks and months you might end up with thousands of or tens of thousands of small files and as we've seen that can be that can be problematic problematic if the files are small enough that being said the opposite is also is also true avoid having very few huge files this is a story that a colleague of mine told me they were debugging an issue with a customer whereby they were ingesting data per day they would add 250 gigabytes coming in per day and they were trying to see what the best way
was to compact that so in one case they would use 250 partitions and in the other case they would just dump the whole thing in one file and there were seeing some weird performance some weird like performance issues there so to debug this they just did a simple query like a select count store on the entire thing and for the data set in 250 partitions this took about 5 minutes which is kind of good it's a it's nothing nothing strange but then for the one huge partition I guess how long this took like it's it
should make sense that this is faster right because you're avoiding having many files it took over an hour to complete which is crazy right if you think about it so why is this if you have a huge file what else like if you're if you're 5 if you're broke a file is huge another thing that also becomes very huge and that is the footer because you're gonna have a lots of row groups there's going to be lots of metadata that you're gonna have to store and turns out that the footer processing is just not optimized
for speed so you have to be be aware of this as well but coming back to to my original the original problem of having many small files I mean you can do manual reporting all right you can just go to your data and just tell everybody ok don't do not touch it I'm gonna be partitioning our data to solve this issue I mean ideally we want to we want to off to up to my earth sorry we want to automate this process but like I said what about concurrent access like are you gonna just gotta
tell everybody ok stop touching the data if you have many different organizers or many different teams in an organization like this does not scale you cannot tell everybody ok stop touching their data we're gonna be repurchasing now like it doesn't work like what you're gonna need is some form of isolation of your operations for example you need asset transaction Ally on that would be really nice to have so is there anything first for camp ok that we can use I guess nobody sees this coming there is Delta Lake yay so yeah what is Delta Lake
it's an open source storage layer on top of park hae-in spark I provides you with asset transactions and stuff another other cool stuff such as time travel it does so by keeping track of a right ahead log and if you just store all the old files that you still have there's a way to backtrack and then see the older versions of your table there's other cool stuff which you probably have heard about these these last days but what it does allow us to do like it gives you the tools to actually implement automatic compaction because
you're not gonna have to worry about concurrent access which is exactly what we did at dataworks is implement this so we implemented the optimize function our sequel call which does exactly that it its restructure repetitions your data into more agreeable file sizes this can also be done automatically right we can just flip a flag and say repetition whenever it's deemed that these file sizes are are not aligning or any more and that of agreeable sizes there's additional stuff that we can do since we have the the right ahead log which is basically a collection of
commits referencing park' files we can store additional data there right we could store things like the min/max statistics there which is exactly what we've done and that we can do skipping based on the min/max of entire files in addition to the rogue group skipping that's already there and the cool thing about this is that it these commits are stored in JSON files but every 10 commits we create a parka file that stores the the information in those 10 commits which means we can read in this metadata using spark which means that it can grow very
big and still be scalable yes a scalable metadata processing because we're using spark to read the data but also read the metadata so there's no additional metadata layer on top of it it also allows us to do cool stuff like Z ordering which is basically a neat way in which you can cluster your data on multiple columns I mean you can do sorting on multiple columns anyway right but you end up with a major minor sort where you would first sort on the first column and then on the second but the effect is that it
will always by us the first column that you chose so Z ordering uses like space-filling curves to create narrow min/max values for multiple columns so yeah let's quickly look at what what i've discussed today so basically we want to reduce I Oh how do we do this by reducing the size of the file which can be done by using a page compression in park' and to accommodate for the dictionary encoding that we have we can avoid reading in irrelevant data and so we can do a row group skipping based on the min/max values in the
at the Roberge level by reading the footer and we can do dictionary filtering by applying by evaluating the predicate on your dictionary instead of the whole row group our we can leverage Parque partitioning so the embedding the other column values in your sub directory structure we've seen that we should reduce overhead or at least keep this keep this in mind by avoiding having many small files or as the example demonstrated a couple of very big ones I would briefly talk to you about Delta Lake which is a layer on top of Parque for spark that
gives us it allows us to implement stuff like auto optimization or auto reporting and compaction of your data set and gives you additional skipping abilities by storing additional metadata in your traditional statistics on your data set in the in the right ahead lock which then can be read using spark which is pretty cool and we can do stuff like Z ordering which is also kinda nice so yeah that's all from me thank you for for listening Thank You bogan we've got about five minutes we got two mics on either side feel free to come forward
and ask questions if you have any thank you yeah hi thank you for your talk and you mentioned partition by and the state frame rights rate as also has a big bucket you mentioned partition pie for this data frame writer but the different writers also have bucket body can you explain what's the difference between partition by and packing by when would you recommend one over the other so partition by is more explicit in the sense that it it just takes all the column values that it finds and it creates sub directories for for all those
column values so it makes sense if you have a limited set of column values like for example in the case of dates it doesn't make sense if you have a whole bunch of like if you have tens of thousands or hundreds of thousands of values you can end up with lots of small files well bucketing does is it a bucket like says it buckets multiple files based on a hash of column values so that you end up with sort of a group of of rows that have similar column values but not it's not based on
these distinct column values so I guess it makes bucketing makes sense in the case that you have a lot of a lot of distinct column bodies that in which case a partition by would result in having like a thousands or tens of thousands of sub directories which isn't always it's not as performant as it should make so I guess so I guess that park it dust or also some metadata that that this data is pocketed so that sport can then improve on reading or joining later so let's say I have my data frame I say
bucket vayan write it and then when I read it with is this information stored in Park it so that I know okay this data is actually bucketed and in and it can improve things I don't believe it is I think it's just like the physical layout this what determines the bucketing I don't think there's any additional metadata but it should increase performance by the fact that you're buckling on specific sub branches of the column values okay thank you okay go ahead yeah first of all thank you for a great talk so my understanding is the
orders or only comes to play when bucketing and partitioning is not working right because it's for multiple column so but can we say still like the order is something like bucketing and at this the second question is like and Zi Audrey is only for like finding something like are doing search or it can also improve join or like it can also optimize the joins like a equi-join sand can we compare that with a at the same level has we can optimize the join on partitioning and the bucketing or can you tell me something about that
so your first question was what's the the Z order in come into play yeah as you order come into play when bucketing and partitioning is not working because it is from multiple columns right and yet at the same time can we see like it's buckling and the order is something similar yeah I guess I guess they're both means of clustering your data in a more efficient way so you can and then read it in in a more efficient manner so I guess yeah and the other question is like is it going to improve the performance
of the joins how I see yeah yeah I guess yeah I guess any means of more efficiently reading in your data which in turn means reading less data that's basically what you're trying to do right because you're even with Z ordering you're trying to emit at max or to optimize in min and Max values which even when you're joining data on a specific column then if you're gonna have if you're going to be reading less data in this place and rest data on the other side the join then yeah your journey is gonna be okay
we only had 30 seconds so make sure I make you a question really sure so thank you for a great you're great role so my question is about the rows keeping so I understand that three is the metadata about min Max and dict filtering you two to have this feature and my question is about non numerical data if we have for example some text data but is not duplicated so much there is some metadata on park' to to have optimized query on this way so yeah there's a packet there's actually min max on non numerical
data as well so like strings and dates but yeah if you yeah I mean min max data is irrelevant of the it's if you're if you have your column which in which case you do not have many duplicated or repeated virus and you can still have these statistics which could make sense so yeah I guess if I order my for example my corner my data before writing in on the this corner I will have the the min max on this non-numerical give me hand to whirlwind again for wonderful talk about the guts of parquet thanks
a lot
Related Videos
Copyright © 2025. Made with ♥ in London by YTScribe.com