welcome everybody in introduction to Apaches Park architecture my name is Emil Kaminsky and today we will talk about the things like a driver workers we will learn spark UI rdds transformation and many other things basically whenever you are planning to do any kind of data engineering job with Apaches spark this is essential but also if you are when you are working as a data scientist or data analyst this knowledge will come handy it will help you to troubleshoot some things or optimize your flows and the way things are organized today we will spend around 50%
of our time talking about the theorist I have a couple of slides prepared for us and a second 50% of the time we will actually open data bricks and run couple of queries then open spark UI and try to understand what was happening under the hood let's get started first things first if we open Apaches Park documentation that's a definition we are going to get and it's actually pretty important Apache spark is Unified Computing and engine and set of libraries for parallel data processing on the computer cluster so why this is important well first of
all Apache spark was never designed as the storage system the thing it do well is it takes the data from one place do some magic some transformation and send it or save it to another place the second thing sparkly shine on the computer cluster meaning whenever you have enough data that you need multiple machines to analyze it spark is the way way to go and the second way around if you have a small amount of data data which fits your personal computer you have already better Solutions than Apache spark even pandas in Python and there
is absolutely no need to learn Apache spark now talking about the librar or module actually the ecosystem of spark is pretty rich the first one and the most important is spark SQL this will allow us to write a SQL type queries to Apache spark today we'll be doing that mainly with python we have also module for stream processing for distributed machine learning for analyzing the graph databases this is not really welldeveloped module and at the end we have something for R fans so if you like to use language R you have a dedicated module in
Apaches Park which will allows you to use that language and talking about languages Apache spark was mainly written in Scala but you don't need to know Scala to use Apache spark you can use Java python SQL or R there are some benefits of Scala but in 99.999% of the cases you don't need to use it and if you don't even know what is it you can think about the Scala like some kind of java it also run on the Java virtual machine on the JPM and we can find Apaches spark in many different places in
data bricks that's where the founders of Apaches spark works but also in all main clouds there is some kind of the flavor of Apache spark let's talk about main components I'm sure that you heard the terminology of driver and executor so both of them are processes which run on the jvm Java virtual machine and usually driver and each executor is being run on the separated virtual machine for the sake of managing that and avoiding the conflicts it can happen that we will have two executors on one virtual machine but it's it's not common and there
is also a third very important element which is called cluster manager like Apache yarn mesos kubernetes or spark Standalone and this is a component which guarantee delivery for us of executor and drivers and we can run our application so let's say that we have a data which we want to analyze and we write application we can write it in again in Scala Java python SQL or R so we write application and application communicates with the driver through something what is called spark session that's the only entry point for us to spark now driver takes the
application and try to figure out what we want to do with the data once it's figured out and once it will figure out what's the most efficient way of doing that executing that application it will split it into the tasks which will distribute to the executors and each executor can process in parallel multiple tasks as many tasks as it has a slots for the execution and it will have as many slots for the execution as many CPU cores so in our example we have two executors both of them are having four CPU cores therefore they
are having four slots and therefore they can process four tasks in parallel now driver sent a task to the executor and it also send a information which executor should analyze which part of the data then executors grab that data exchange the information between each other if there is such a need and send the results to the driver or save it to the files or or send it somewhere else not always has to be sent back to the driver so that's how the things looks on the very high level so let's start looking to some examples
and after that we will jump to the data breaks and see how exactly the same example Works in Apache UI so let's say that we have application which want to count the number of the records in our data set and the data set is of course represented by a blue bars over here and we have millions and millions of that row our application communicates with the driver through a spark session driver takes that application and analyze what's the optimum way of executing that and also analyze a data how much data we want to count how
many executors we have and what is the most efficient way not only to execute a application but also what is the most efficient way of Distributing that data set to the executors so it looks on the data and decide on how this should be splitted into something what is called called partitions next it send the tasks to the executor to grab a different part of the data and assign it with the task to each and every slot and in the slot one two and three we have a similar amount of data in the slot number
four we have less data this is normal situation now the first task will be to do a count locally on each slot so executor starts to execute a tasks and it may it will happen that in this case slot number four will finish first because it has the smallest number of the data then slot number one slot number two and Slot number three in this case all the task were successful it can happen that some of the task will fail and the driver will need to reassign a task to another slot but in our cases
all the task were successful and we are halfway done we have a local counts but we don't have a final number the next tasks which driver will send to the executor it will assign one executor to get all the data from all other slots and from all other executors like in this example and another task it will send to this executor will to perform a final count and that will be the final count and then depends on the application the instruction may be to send a data or save it to the file and then there
is no need to communicate the results to driver or the instruction may be that give me the number because I want to display that to the user and inside such a case there will be additional communication happening between the executor and drivers and the driver in our case we will just display that to the user let's discuss the big elephant in the room during the examples we will be using something that is called spark UI and we will be using that a lot spark UI it's really powerful tool available by default on Port 4040 which
will allow us to understand many things which are happening under the hood and optimize our workloads the thing is that it's not really modern looking it's not really super intuitive starting from the day one but for this or another crazy reason once you will start using it once you will start playing with it and try to understand what's displayed there sooner or later it will become your second nature therefore if you will not get it at the beginning just absolutely don't worry try to open in on your side and try to just follow me and
you will be good okay so we are now in the data breaks in our first notebook where we are going to start exploring how Apache architecture really works and first things first we are using a small cluster with two workers each worker is having 8 GB of memory and four cores so in total we have 16 GB of memory and eight cores and if we have eight cores it means that we have eight slots and the driver is the same 8 GB and four cores and at the beginning of each notebook we are going to
have a small setup this is nothing what you want to have in the production but whenever you are experimenting you may want to have it we are disabling caching data on the storage or on the workers nodes for the sake of consistency of each and every example which we'll be showing we want every example to start fresh so at the beginning we are disabling caching and a second thing is this is our sample data for that notebook in each notebook we will be using fairly similar sample data and this is available in each data breaks
instances it's examples of the data about New York City taxes and the trips and that's how it looks like we are going to have multiple files and its size and the reason I like that file I like that data set well first of all it's available in all datab bricks instances second thing is that those files are fairly significant in size like in this case we have a data from 20097 and it's having 465 mbes so it's a good data set to EXP experiment with different performance and how different functions behave and let's look on
our first count example so about 29 data and here we are using spark to read all the CSV file from that location which are about the year of 200 2009 so in total we are reading approximately 5 GB of data and then we want spark to count it so let me run it and what we are going to see is that spark is actually starting think something what is called job we will be talking about jobs as well as the stages or the task later on in details but we can see that straight away this
is the job number zero it's a first job the the number starts from zero and it's having one stage also called stage zero in that stage we have one task to be run and so far zero has been completed and once it will be completed spark will start a second job also with one stage but here we are having a 12 task s and eight are Runnings in parallel because we have eight slots and as we can see some task has been already completed there can be basically two reasons for it first of all the
partitions assigned to that task are smaller than for the rest of the tasks or simply that slot is faster so eight tasks completed four are running and now we see that all the 12 tasks has been completed there was one more additional job being triggered by spark with only one task but it was very fast and now we can investigate a further details in the spark UI there is multiple way you can get into the Sparky y in the data bricks you can go there by clicking this view those views or you can go and
click on your cluster and then here you have a link to the spark UI we are going to have a spark UI always open in the second tab so we can quickly switch between it and what we see by default is we see the number of the tabs on the top like jobs stages and so on the three the most interesting for us in couple of next examples will be jobs stages and SQL SL data frame by default I'm in the jobs and in this tab for the time being I see three jobs 2 one0
and those are a jobs which was visible over here job 2 one and zero and from here we can already see some details about those jobs meaning the duration of it we see that the second job the job number one to took the most of the time 1 minute we see the number of the stages in each of that job and we see the number of tasks in each job this is the job which actually was the only one having more task than one so it would be nice to investigate further details about that job
usually we will be focusing on the job which took the most of the time and in our case it's this one and we can do that very easily by clicking in the description over here and it will take us to a bit of different view where we see something what is called Dark visualizations which again will be explained in details later on but it already gives us some idea what was happening in that job at the beginning there was scan of the CSV file then there was some computation later on it will be explained what
the whole stage code g means and then there was exchange and whenever you are going to see Exchange in in the dark visualization it means that there was a data shuffling happening between the executors so exchange always stands for the data shuffling and if we scroll down we see this stage so we can also click on that stage and it will take us to further details and now as you can see the TP on the top has been changed to stages where we see that the total time across all task it was 6.9 minutes and
the input size so how many data we have read and we read in our small application 5.7 gigb of data from the storage and here we have again the dark visualization not so important but once we will scroll down this is super important we are going to get the visualization of our task over a Time Each of that bar represent a task and immediately immediately from that view we can figure out that we have two workers and each worker is having four CPUs and the way we can figure this out is first of all the
worker are very easy to spot we have here a worker number zero and the IP address and the second worker and in each worker if we look closer we have maximum four tasks being executed in parallel here it's very obvious 1 2 3 4 but the second worker is more more interesting 1 2 34 but then another task are starting but those task are starting only if there is previously other task finishing so this task has started only when this task has finished so if we we take a look over here we have four task
in parallel at any given moment of time we are going to have four task being run in the parallel unless we are finishing the computation then of course there will be less tasks so this is very interesting View and we'll be coming here pretty frequently and once we are going to scroll down further we are going to see another interesting data this time summary metrics for 12 completed tasks here those data we will be interpreting also frequently if you don't follow at this stage don't worry in each and every example we'll be coming here and
explaining different part of everything what you see here over on the screen and what I would like you to pay attention to is the input size record that's basically the information about the size of the partition and we see that the smallest partition assigned to the task minimum the smallest was around 457 megab and the maximum on the right partition assigned to some task was 540 MB so just like in our example in the presentations the partition may be different in size and if we scroll down then we see the aggregation metrix per executor which
is also very interesting so we see that executor with the ID zero has executed four task it was also visible here so here we had the four tasks and second executor has execute eight tasks and the total memory red here 2 GB here always almost 4 GB so also very interesting and here we have the farther details about each and every task how big partition it got or how long time it took in our cases all the tasks have been successful it may be that some task will fail then driver will assign that task and
that partition to another executor so we are going to have actually more tasks but in our cases everything was successful and now let's go to another interesting tab which is on the in the middle it's called SQL SL dat frame where we are going to see a bit of higher level a bit of aggregated information of what happened so in our cases here we are going to have two entries and on the right you have information for which job each entry is and the duration and we see that the the job which took long time
is over here in the first entry so once I will click it it will take us essentially to the same information just presented a bit more differently and this time we are reading that visual visualization from the bottom so at the beginning there was scan of the CSV files and the information about how much data we have read then moving up there was a hash Aggregate and that was on our presentation the first time the count happened on each slot so here we are doing the first count and as the output records we got 12
records 12 records because we had 12 task and each of that task finish its job with one record and then the next step is exchange and exchange again it's a data shuffling it's moving data between the executors so we see exchange happening very small data being exchanged of course in this example later on we are going to have examples where there will be a lot of data moving and then final count happening over here and the results are being sent to the driver for the sake of displaying that for us so that's the whole idea
we will be talking about that in details shortly and explaining more things now moving back to our notebook let's run a second example where this time we are running a data for 209 and 2010 so one year more and I will hit control enter to run it and the idea will be the same spark will will trigger a first job just like previously which will be very fast and one task then the job number four and this is where it's interesting we don't have any more 12 task this time in that job we will have
24 tasks so far zero completed eight running and the idea is the same some task will finish earlier some task will finish later if some task will fail then the total number of the task will be 25 or more in our case with so small number of data and so simple computation I don't expect anything to fail and here it comes the this job has been over then was a third job also very fast one task and this is a results displayed for us meaning the data went through the driver for the sake of displaying
that for the user and if we go to the spark UI to investigate job number three four and five and let's go to the tab jobs so we see that the job number four that's the job which tooks the most of the time 1.4 1.6 minutes with 24 task let me click on the details and a dark visualization it looks exactly the same because the task basically it's the same just volume of the data has changed slightly and then by clicking on the stage number five it will take us to that nice visualization of the
tasks so right now we see that we have twice as many tasks and now each executor has way more job to be done and if we scroll down to the summary metrics for all those completed task here the thing starts to be becomes interesting because we see that the difference in size between the partitions the smallest partition and the larger partition red this is pretty significant here we have 300 megabytes and here we have over 500 megabytes so the difference becomes really noticeable and the summary of all the tasks and for the sake of the
exercise let me go to the SQL and data frame tab to see how the the things looks like over here and the longest task the longest jobs are over here so I'm clicking on the description and and we can see that actually the execution Lo is is the same so we are scanning CSV just this time the volume of the data we're reading from the storage has almost doubled and then we are doing the partial count on each slot just like on our presentation and the output rows is 24 because we have 24 tasks then
we are doing the data shuffling exchanging a bit more data being exchanged because a bit more tasks then we are doing the final count and we returning results to the driver so exactly the same idea and as the last example with that notebook just to show the contrast let me read a single file from 2019 December so that's significantly less data but the structure of the application will be the same first job very fast seven job number seven take a look on that in this case we have only one task in this case we were
having only one task in the first one we were having 12 then 24 but now we have only one so let's take a look on the details of the job number seven stage number nine and jobs number s took 8 seconds and going into the details we see stage number nine looking exactly the same going to the stage number 9 8 seconds that's the data which has been read if we scroll down we see that there was only one task executed is because Apache spark has decided that data is small enough there is no point
to split it across multiple partitions he decided it will be better to read it as the one partition assign it to the task and get a job done what is reflected over here summary metrics of one completed tasks 114 megabytes and also over here there is only one task okay coming back to our notebook that was the last example in this one key points to remember driver plans a job and make sure that those jobs are being executed if executor fails it will take a task and assign it to another executor and what is important
driver almost never touch a data there are cases where it touch it but generally it's avoid touching doing anything with the data otherwise it will become very quickly a bottleneck and executor execute the task and Report the status to the driver there can be more than one executor on worker note but in case of the data braks as well as majority of the spark implementations it will be one to one one executor on one worker note there is as many slots on the executor as CPU course there can be as many tasks executed by executor
in parall as many slots task is the smallest unit of work this is a frequent question on data breaks exams so for instance if we have a two workers each four CPU course that gave us eight task which can be processed in parallel on those executors data are being splitted into partition one partition for one task and driver and executor processes run on the Java Virtual Machine by now we were repeating a lot that driver and executors sits on the separate virtual machines which is not entirely true there are other options if we think about
our own computer and then a data center or some virtual machines in the cloud there can be actually fre deployment modes a local deployment mode it's something what allows you to install whole spark on your computer driver and executor and this setup it's mainly for playing around testing debugging maybe you want to develop something in such a setup the second deployment mode is you can actually have a driver on your computer and executors in the cloud or in the data center and this setup is good for developing something testing maybe interactive applications but requires a
good network connection between your computer and data center and the first setup which is actually the only one which is suitable for the production workloads is the cluster mode which we were discussing so far where a driver and executor sits in the data center in the cloud and that's another frequent questions on the data breaks exams or spark interviews let's talk about rdds data frames and data set some time ago when you wanted to write application which do something with the data that data would be read into something what is called rdd resilient distributed data
set and resilient mean that it's immutable meaning once created it cannot be changed if we would like to do something with the data then the new rdd would be created and this strange Behavior helps Apache spark to achieve fault tolerance and then our application would interact with the data as we are going to see in the second through the rdd API which first of all wasn't the most intuitive second thing it wasn't the best optimized and then driver under the hood would take that rdd split it into the partitions and then assigned tasks and partitions
in for the executors since 2015 we have two new apis available for for us it's called structured API or spark SQL one of it is called Data frame and we can use it in all the languages another one is something what we can use only in Scala it's called data set and this is basically the higher level API meaning it's first of all it's more intuitive kind of looking like a SQL API which is basically easier and faster to use than rdd and the second thing whenever we are using data frames and data set instead
of rdd everything what we are going to write is being optimized by Catalyst Optimizer and this is a big advantage over the rdd we can still use the rdd API but nowadays it's being considered as the low-level API it gives us more control but it's less intuitive and it's not optimized by Spark by default but we are going to see that in action and we are now in the second notebook about rdds data frames and data sets as understanding of those is important to understand Apache spark architecture cluster is the same many things are the
same what is changing is the data set so our task for that notebook is to use three of those rdds data frame and data set and find using each of that number of Taxi trips per number of the passengers and this is our data set is the same as previously just previously the files were compressed and for the sake of making that easy to into the rdd I made a copy of one of this file just it's uncompressed and it's ending with the CSV and the data looks following the logic behind it is not so
important The Only Thing Worth paying attention to is the fourth column called passenger count with the number of the people in the taxi per trip and now let's go straight to the oh sorry first things first and I need to prepare a setup and now let's go straight to the rdd so this is a reading and grouping and Counting the data used using the rdd so at the beginning this is the first difference previously we were using a spark session in order to read the data into the rdd I need to use something what is
called spark context then I'm using a function text file where I'm specifying of course one I want to read and then we are going to see filters maps and Reduce by key so I'm filtering I'm filtering AR rows and eliminating the row which contains vendor name and the reason is because that's the first row which I don't want to take into my calculations so I need to filter it in the rdds then for each row I'm splitting the data using a comma separation and done with map I am taking the fourth column and reducing by
key just to generate a group by and then I'm returning a final results to the variables count and trip count and then I can print it so let's press contrl enter and see how this looks like in action so you can see that it's it's not really a SQL like it's requires from you to learn the New Concept when you want to use the rdds like those filter Maps which is more a functional programming style not a sequel but anyway here comes the spark job and what we are going to see is that now we
are going for the first time we are going to have one job with two stages previously in all other examples we were having multiple jobs and in each job we were having mostly one stage here we are having one job with two stages and in each stage we are having five tasks so let's take a look on how this looks like in the spark UI job number 28 so I let me open spark UI and here I have a job number 28 so let's take a look on the details and we can see that something
what is called Dark visualization which we will explain is showing right now two stages and those two stages are connected what means that there is data exchange happening between those two stages and let's say that I would like to look into the further details but using the SQL data frame tab here is the surprise waiting for me there is no entry for the job number 28 so our job was number 28 and there is no entry here and the reason is obvious the rdd is the lower level API not the higher level API and that
TP is dedicated only for the higher level AKA SQL API meaning whenever you are using a data frame data set or you'll be writing a SQL in the data bricks you will have entry here but if you will be using the rdd which is lower level API there will be no information about that over here so the only information you can read about your application in case you are using rdd will be in the jobs or stages which is natural consequences of Apache spark architecture so let's get back to our notebook so that's how reading
and grouping the data looks like in ldd so we see that it's not so really intuitive and and now let's use a data frame and with data frames it's completely different story it's basically day and night so first we are reading a data using spark session spark read option and here we are reading straightway headers so it's simple as that and specifying where the data we want to read is located and then we are doing a SQL like group by and count which is fairly intuitive for everyone who knows SQL and doing a show control
enter and we can see that the application structure is immediately different we are getting two jobs at this moment and each job is having only one stage this stage with 16 tasks and we also get a third job with one tasks and that's the results and if we move back to the data frame SQL tab in the spark UI we are going to see those jobs over here and we can investigate further details in the already familiar for us way so that's basically what was happening we were scanning CSV doing the partial grouping exchanging the
data and exchanging the data what is important to say is that whenever we are exchanging a data spark is repartitioning the data which we're exchanging and the default number of the partitions spark is always trying to apply is 200 but then optimization spark optimization which we'll be discussing later on kicked off it's compressed the number of the partition to the to one it's performing the final grouping and then the results goes to the driver so that's the whole data frame example so we see that first of all it's way more intuitive to read secondly we
have this higher level API data available in the SQL SL data frame and the third thing is that a reason for having one of the reasons for having multiple jobs with one stage over here versus one job with multiple stages in case of the rdd is that because of of the data frames there is a lot of optimizations happening like Catalyst optimizers and others which we will discuss later on so whatever we are doing in the data frame it's way more better optimized that whenever using rdd so the rdd is harder to use and it's
less optimize and now let's take a look on how the things looks like in data set so data sets are available only in Scala so I need to Define that starting from now I'm going to use Scala as a language and let me start from the second line actually so here I am defining value data set in the typical for Scala way and then I am using exactly the same way as in case of the data frames spark rate option CSV and the puff so we can see that spark is actually Unified no matter if
you are using a data frame or data set many things will looks exactly the same just at the end I need to define the data structure which I'm expecting for the type safety purpose and that's a big difference so for that sake I need to define a case class which everybody who knows Java or Scala are familiar with it's typically oneline definition of class where you are defining a column and a type column and a time and so on so you need to Define your whole data structure and using that case class you can read
that data in the type safety way to the data set and then in the the exactly the same way as in case of the data frames Group by count and and show and what is important again that this is really a squel like program control enter and we see that the even the structure of the application is exactly the same we are going to get three jobs with one stage here one stage here and in the third third job we are going to get two stages one will be scrapped completely and the second one will
be executed and here it comes that's the results if we go again this is job 32 33 and 34 if we go to the spark UI and Let me refresh that tab it's because we were using the higher level API we have a details over here and everything else looks exactly exactly the same as in case of the data frame just because we were using the case class and we were defining what data structure we are expecting we achieved the type safety and that's really the big difference okay that was the last example for that
notebook key points to remember guys the first one is rdd which stands for resilient distributed data set and it's immutable distributed collection of objects which helps Apache spark to achieve theault tolerance data lineage and support wide range of Transformations and actions just the rdd API is not the most intuitive as we have seen it the second data structure is called Data frame inspired on pandas from Python and data frame it's a higher level abstraction over rdd organizing data into name columns similar to the tables or to the spreadsheet and whenever we are using a data
frames everything goes through Spark Catalyst Optimizer which is a big benefit and the third data structure is called data set and this combines benefits of Rd and data frame offering a strongly typed objectoriented API that's why we can use it only in Scala and combine it with the optimization capabilities of the data frame let's talk about the Transformations and actions so basically in Apaches spark there are two types of operations you can perform it will be either transformation either action transformation can be understood as a business logic you want to apply to your data it
takes one or more rdd data frame or data set and transform it into new we were already repeating multiple times rdd data frames and data sets are immutable so whenever we want to do something with the data it will create a new one the original one will not be changed and transformation this is important can be either narrow either wide white transformation requires shuffling data between the partitions meaning exchange of the information between the executors and example of the narrow transformation are filter select create new column or cast so let's say that we have a
data distributed across multiple executors and we call a function filter executors don't have to exchange any information between themselves it will be enough if each executor will just filter a data which is having in his own memory and written the results there is no need for exchanging the data data on the other hand the examples of the white transformation such a group by sort and join those require shuffling data and again we can imagine data spreaded across multiple executors and we calling a function sort then in order to have a data properly sorted out it
will not be enough if each executor will sort only data which is having and then return return the results because data will be partially sorted there need to be exchange of the data between the executors so they need to be there need to be something what is called Data shuffling happening so we can sort all the data properly and the last thing about the Transformations Transformations are lazy by nature meaning spark will not do anything unless it will have to and on the other hand we have actions actions is a demand of results for example
whenever we are calling a function collect count save take and show this is basically information for Apache spark okay enough is enough execute all the Transformations plus an actions I have been asked you to do and Spark will not wait any longer it will execute everything what we have asked apach spark to do in our application and we are in the third notebook about the Transformations and actions which are another Cornerstone of Apache spark architecture and this is the shortest notebook we are having we are having here only one query just to demonstrate the difference
when we are having only Transformations and when we are applying an action so right now we are reading a data which we already know about New York City taxes and we are applying apping our first transformation filter and then we are applying our second transformation which is Select of two columns and then we are doing the group buy and some calculation so this could create the impression that we are actually calling for an action but at this stage we are not telling to the Apache spark what do we want to do with the data so
if we run that no computation will be kicked off there will be however one small job over here job number 47 with one stage which will be super fast it took one second or less this job is not about the computation if we look inside of that job we will see that this job is really for a spark to figure out what data we are talking about it's not even about reading the data is just about figuring out how to partition that so let me close that and for the comparison here we are having a
free simple actions show to display row results to the screen count to count the records and the third one is actually more interesting it's about writing the data but in this case we are not going to write anything anywhere because we are specifying format nope which is sort of fake it's a information for Apache spark that we are doing that just for the testing purposes we don't want to save any data to the storage and let me uncomment one of it and the difference will be noticeable why previously it took less than 1 second right
now it will take way more and we can already see that we have more jobs we have the same job as previous about figuring out the partitions and that we are actually having the heavy duty job with the real computation and now the whole application took 57 seconds because only now the data has been read and the calculation have been executed and that's it for that notebook job stages and tasks so every time we Supply application to the driver driver will split our application into one or more jobs jobs can be run in parallel but
it's under some circumstances but usually it will be run one after another inside each job we have one or more stages what is important to remember is that stage two can happen only can start only once the stage one will be finished there are again some special cases where they can slightly overlap but the general rule is that stage two will start after stage one will be finished and inside each stage we have multiple tasks or one or more and we can run as many tasks in parallel as exeggutor's CPU of course that we already
know and the reason for stages to happening one of another is because Apache spark try to compact all the Transformations which does not require data shuffling into one stage so all the narrow Transformations like a filter or creating the new column and then only when the data shuffling is required then the new stage stage will be created so the data shuffling is a boundary between the stages and now each job is represented by something what is called called directed iakc graph in short duck and it's called like that because it's only one direction and in
this case on the right we have example very simple example of duck with two stages stage number 10 then the data shuffling is happening and stage number 11 even we have here information that the exchange is happening so this is also information for us that the data shuffling is happening over here and before seeing that in practice let's discuss one more time laziness of spark so let's say that we have very simple application which is reading some data and already at that stage spark will or can fire very small job to figure out which data
we are talking about where are the files what what files we are talking about how much data there is and let's say that in our application we want to group by something this data and then perform another wide transformation maybe sort maybe join and then filter a data and then we are asking Apaches to show us the results so Apaches spark is smart enough to identify this guy and to figure out that it will be way more efficient to push that guy as much as possible to the left and the reason being is by doing
that Apache spark can limit a data we reading maybe data are being partitioned maybe instead of filter we are using a select and data are in parket so Apache spark will will only read a specific columns but basically Apache spark will try to limit as much data as possible so the farther transformation the wi Transformations will have less job to be done and then of course show is happening so action and Apache spark will fire a jobs in our case job number one and second job job number two and this laziness of transformation is one
of the principle of Apache spark architecture and we are in the fourth notebook jobs and laziness according to Apaches Park architecture and this will be combination of the things which we already know very well with some new stuff so let's talk first about ducks here we have a query from the previous notebook where we are reading a data applying a filter select doing a group buy and then an action show and in our case the query has been already executed and we can figure out part of the dock already starting from here we see that
we have three jobs and four stages if we go to the spark UI and we find for instance a job 63 we can figure out a dock over here so 63 and that's a duck for that job if we remember the rdd we didn't have multiple jobs but we had a one job with multiple stages we could see all the stages in one place in case of the data frame like right now we have different jobs and if we want to have the holistic view we need to go to different stages and figure out the
different part of the application over there like to job number 65 and figure out this part of the dock or the dock for this job over here another way on looking on the dock is to go to SQL data frame tab just this time we don't get exactly a dock but something what is called a plan and we already know plan very well it looks like following and the way we read it is from the bottom because that's the only way really you can do that properly but but if we scroll down we see very
very interesting thing over here at details and if we expand it we see something what is a bit scary at the beginning but it's actually fairly intuitive and useful this is the post execution details of what spark has been done under the physical plan we see what steps apach spark has been done we also see the initial plan the difference between between that we will discuss in a moment when we'll be talking about the Catalyst Optimizer and if we scroll down to the bottom we see further details about the plan and actually what we see
here at the beginning it's very useful but we don't need to wait for the execution to happen we can actually figure out this plan and the same time have a pretty good idea about Duck Straight From the place where we are writing our code and for that we don't need any action if I comment show so we have only Transformations and I use a function explain contrl enter I am going to get well one simple job because spark is trying to figure out the data again but then we are going to get a physical plan
as Spark believe it will looks like and the way to read it is from the bottom again so at the beginning we are doing the file scan and we can see that Apache spark is trying to extract Only The Columns which will be needed for the computation if we were using as a data set a parket fice or Delta format spark would be actually success and would read only those columns but because we are reading the CSV spark needs to read the whole data set anyway then spark is applying our first filter over here checking
if the passenger count does does the filter checking if the passenger count is not null and checking if the passenger count equals to one as we've written in the query then we've written the query select and the same is happening over here just it's called project H trip distance and payment type then we see the first calculation hash aggregate so Group by happening on the separate slots we we see exchange and we know already know very well that exchange is about data shuffling and then we see the final group buy and does the whole plan
as spark see it before executing actually the query of course it may happen that in case of the big big complex applications while the execution is happening spark will collect more data about the execution and it will decide to change the execution plan on the fly but in majority of the cases this plan displayed with the function explained this is actually fairly similar to what you are going to expect to see at the end of the execution and let's now talk about the laziness I just need to change a comments over here so this is
the same quer as above and and this is going to give us exactly physical plan as before it's exactly the same and now what I want to do is I want to add one more transformation over here one more filter to the whole query and here comes the beauty of the laziness because Apache spark is not executing anything every time we are giving him a transformation like here here or here but waiting for an action spark will actually catch the fact that we are doing a transformation which he can push closer to the source and
he will optimize the query for us let me hit control enter and let's see where this filter with payment type equals to cash will be pushed so control enter and as we can see instead of adding the filter at the end of the execution spark is actually pushing that filter and combining that with the same filter as we have before with the filter about the passenger counts and now we are filtering not only passenger counts equals to null but also adding payment types equal to null and the same passenger count equals to one and payment
type equals to cash Apaches Park use the the opportunity of being actually lazy and before us hitting and adding any actions it actually optimized all the operations we expecting him to do let me add to this one more thing the function explain is having actually very useful parameter which is called true and once you will be learning about a spark more and more you will be using that function pretty frequently let me keep the same query and push control enter right now the difference is that right now besides of the physical plan which I got
before I'm also getting other plans the earlier plans and if I scroll to the top I see something what is called par logical plan meaning that would Apache spark ask us if it wouldn't be lazy it would rate the data apply the first filter apply the first select group by and the second filter at the top that would happen if Apache spark would execute the query as we are asking him to do but because of the Catalyst optimizer which we will be discussing later on it was successful to go through couple of optimization stages and
push it closer to the source that's it for that notebook data shuffling so why do we care about data shuffling we are talking about it pretty a lot so data shuffling is a process of redistributing data cross partitions and typically involves data exchange between executor nodes we already know that and one white Transformations require shuffling of course so for example Le whenever we have narrow transformation and we can imagine that this is our cluster of executors when we want to apply a transformation like a filter which is narrow each executor can take data which is
having already in memory and just apply a filter it doesn't need to have any other data from any other executors and in contrast when we want to perform a wide Transformations like a join or sort or group by then each executor to perform that task properly needs a data from other executors so we are exchanging a data and the problem is that the beauty of spark is because it's processing a data in memory that's another fundamental principle of Apache spark architecture and that's why it's so fast and whenever we are talking about the data shuffling
we need to take that data from the memory save it to the storage send it over the wire to again to the storage and then again read it to the memory and those operations are significantly slower than just memory processing sometimes you can avoid it like for instance avoiding making unnecessary sours or optimizing some joints or filtering some data early on but frequently we shouldn't go crazy about the data shuffling we should be aware of it we should be aware when in our code it's happening but just sometimes we will not avoid it and before
jumping into Data breaks let's take a look on one more example let's say that we want a group by and count our data and different colors represents different category of data so at the beginning driver is deciding on how to split the data into the partitions then each executor is grabbing their part of the data and perform a local group byend count but shortly executors are hitting the wall and in order to continue they need to do a data shuffling so the new stage is required the data is being exchanged and only then executors are
able to continue with their task and perform the final counts and here comes the results and we are in another notebook this one about the data Shing and the reason we are here well actually there are three reasons first of all is to see how you can recognize a data shuffling in Apaches spark UI the second reason is to see the impact of of data shuffling on the performance and the third thing is to see that the data shuffling does not only depends on the type of the transformation we want to apply to the data
but also on the data itself so knowing a data it's always a good thing what is different is that most of the queries in this notebook has been already executed by me is because they take a bit of time and to make it faster the data we are using is a data which we already know and like about New York City taxi that's how it looks like we will use couple of columns like a pickup date payment type and and fair amount and the first query is following we are reading the data for two years
so we are going to get around 5 gigb and then we are doing a very simple group buy by a payment type and doing a count and if we take a closer look on the payment type we are going to see that we have only two options over here someone paid either by credit or by cash and this type of the column is called Low cardinality column because there is very few options and if we run explain that we are going to see no no surprises over here that we are doing the file scan then
we are doing the first group buy then exchange and every time we see exchange we already know what it means data shuffling and then final group bu and after the executing the query that's the query execution the action is writing the data but it's a fake writing because the format is nope that's the results we're having three jobs and we can go to the spark UI and take a closer look on those jobs and how to recognize the data shuffling it was 3 4 and 5 if we check the execution time job number four was
the one which took the most of the time so let's go and see the details of it and once I will scroll it down I can see over here on the right Shuffle right this is basically the information how much data has been written to the storage to the drive for the purpose of shuffling this time it's not a big number it's 125 kilobytes and we can see further details by clicking on the stage description scrolling down and we also have a statistics what was the minimum volume of data written by single task to the
storage and what was the biggest volume of the data written to the storage by a task 8 6 kiloby so a bit more and if we scroll down we can also see the details by a task so that was the task which wrote the most and if I sorted the second Direction This wrote way less okay let me keep that tab open and let me scroll down to the second query so here we are having exactly the same query what is different is the column on which we are doing the group by so previously we
were grouping by payment type now we are grouping by fair amount and if we look on the data this is higher cardinality column because we have way more options over here than a payment type and again checking the execution of that query job 6 7 and 8 so let me go to the spark UI and find those jobs 678 678 so job number seven was the one which took the most of the time let's see the details and we see that shuffle right nothing big 1,44 kiloby and going into the details of the stage we
are probably going some to see some small change over here so the minimum was 40 kilobytes written by a task and maximum was 50 not a big deal if we compare it with the previous one well more or less the same we are still talking about very very small number of data written to the dis it's not really anything impacting the performance but let me now get back to the notebook and let's do exactly the same query just this time using the pickup date time and if we check the data pickup date time it's somewhere
on the left pickup date time so we are going we see that this is really the high cardinality column there will be very very very few values which are like this one okay this one is happening twice but usually each and every value will appears only once so this is the high cardinality column and going back to our query as the results we got job number 9 10 and 11 let's find them in this par Qi 910 into 11 so job 10 was the one which was taking the most of the time and let's go
into the details scrolling down we see that this time we've roten to the storage to the drive way more data before that we were talking about 1 Megabyte now we are having 673 megabytes and this is significant value this is the shling which we want to be aware of so let's go into the details of the stage and scroll down well first first of all we will notice that some task are having the yellow bar and the yellow bar according to the agenda is the shuffle right time and this is the indication for us that
each task does not only spend a Time on doing the calculations but actually the tasks started to consume a Time on writing the data to the storage for the purpose of exchanging it so we starts to see the impact and if we scroll down now we are having pretty significant number of data written to the storage by each and every task 24 megab over here and on the right 29 mgab and if we compare it with the previous one previously we were talking about the kilobytes and now we are we are talking on the megabytes
and here we have the example of the shuffling which is actually already having the impact on the performance let's get back to the notebook and I have one more query and the last one in that notebook and this is sorting so sorting should be prohibited in in data engineering work so what we are doing here is we are reading the same data and then order by a trip distance and then if I hit explain what we are going to see it's similar query file scan exchange and sort and if I scroll down to the query
results that's how it looks like it took 40 minutes 40 minutes to order it so whenever we can avoid sorting data that's something what we should do and let's go and see the details of the job 13 14 15 and 16 we see actually three jobs which took significant number of time let's check out the one which took the most 16 minutes over here so let me click it and scroll down so only this job wrot to the storage more data than we are reading from the file it wrote for the purpose of shuffling it
wrote 37 gigaby so you see how complex the shuffling can get and let's get into the details by the Curiosity to see how it looks like and if I scroll down well we see that that actually tasks spent more time on writing the data to the storage for the purpose of the shuffling than the computation itself so it's pretty impressive and if I scroll down further I see that the task which wrote the smallest number of data it was all it was already over a gigabyte and the one which wrote the most of the data
almost 2 gbt let's do one more thing and see how we can check the same in SQL data frame Tab and 13 14 and 16 that's the one which is interesting duration 30 minutes okay so that was our execution plan and if I go and check let's see the exchange over here and we see that shuffle byes written total 37 gab and the total number of time the task spent on writing that the drive over 1 hour so you see that how you can think of a kind of simple computation like the order bu can
generate a massive massive data shuffling and what a big impact it can have on on the performance and before I will close that notebook let me one more time pay attention to the number of the partition we are talking about partitioning of the data usually twice in the Apaches spark first of all when AP Pacha spark read a data from the storage then we were talking multiple times that a driver check out the data and decide how to split it into partitions but there is also a second moment when we are frequently talking about the
partition partioning of the data it's when there is the data shuffling happening the default number of the partitions when the data shuffling is happening is 200 meaning whenever we have data shuffling apach spark will try to divide it into 200 partitions and distributed and here this is the introduction to the next part which is about the optimization we are going to see a mechanism which is deciding on the Fly what's the optimum number of the partitions not to keep it 200 all the time and in our case it has decided that the four number is
the optimum number of the partitioning for the data shuffling so it has decreased the number of the partitions but more about that in the next notebook let's get back to the this and to sum up everything so we already know how to check a data shuffling in Apaches spark UI we saw the impact on the performance which can be tremendous even with the simple operations like the group buy similar case can come with the merge operation and we also saw that the data shuffling does not only depends on the transformation but also on the type
of the data by executing the same query three times just changing the type of the column from low cardinality column to the high cardinality column key points to remember shuffling is a process of redistributing data across partitions and typically involves data exchange between the executor's nodes it requires sending saving data to the disk sending that over wire again to the disk and only then can be read into the memory which of course is slow data shuffling can be very expensive as we have seen sometimes it can be mitigated or avoided by even a code change
like for instance avoiding sorting the data nevertheless it's often necessary evil ladies and Gentlemen let's touch on the optimization Topic in Apaches spark and optimization in Apaches spark architecture it's a massive thing and you as the user can do a lot to optimize your flow what will be discussed in separate movies including choosing the right format to keep your to to store your data like a parket or Delta format instead of the CSV especially Delta format is so way more optimized than the CSV avoiding expensive operations like sort which we already know that is generating
massive data shuffling minimizing volume of data caching data frames repartitioning avoiding something what is called userdefined functions again a separate movie about that will come partitioning or indexing data bucketing optimizing cluster itself and so many more things which which we will discuss in details nevertheless what is important at this stage is to understand that Apaches spark architecture is designed in the way to do a lot of things for us already including Catalyst optimizers which we have mentioned couple of times already but also something new what is called tank standand which we didn't touch on and
we will say a couple of words about it or adaptive query execution which also works for us a bit in the background talking about the Catalyst Optimizer whenever we have a SQL query or some code using data data frame or data set the first thing which is generated it's called unresolved logical plan and the first step for the Catalyst Optimizer is to check if the tables we are using in our query or the code columns or functions are actually being registered in the catalog if they are not registered in the catalog we will get an
error if those exist logical plan will be generated then Catalyst Optimizer is doing couple things for us like a constant folding so if we are having some constant in the code it will place it in the values or predictive pushdowns or projection pring meaning pushing filters and select as early as possible to minimize volume of data is being cre and is generate optimized logical plan and with optimized logical plan is generate different possibilities to execute that plan including different strategies to perform joints and then calculates the cost model for each of it checking how much
data will be read with each physical plan how much Network movement will be done or how complex computations are so it calculates the cost model for each physical plan and then it select the most Optimum physical plan and voila tanken it's a bit of different Beast the goal of project tangan is to improve spark execution by optimizing spark jobs for CPU and memory efficiency so Catalyst Optimizer as we have seen before generate optimized execution plan tanken on the other hand it's used to generate optimized code using whole stage code again which we have seen many
times in the execution plan already and improve how the data are being handled in the cluster so comparing to the Catalyst Optimizer tanks and optimizations are just way closer to the bare metal and the last Theory before jumping into Data breaks and seeing things in action is called adaptive quer execution and this is actually very interesting optimization approach which makes sure that the your application is not optimized only once at the beginning but once your application runs and Spark collect more and more information about the date about the operation about the cluster Behavior it reruns
some elements recalculate the optimizations including Catalyst Optimizer so the Catalyst Optimizer will not be run only once at the beginning of your application but if the application is long enough it will be run multiple times that's why sometimes we will get multiple jobs and in each job job we have only one stage because after that one stage adaptive query execution was triggered the Catalyst Optimizer recalculate everything once again and that has created a new job which we are going to see that in a second anyway and this will involve dynamically change sord marage into the
broadcast has joints D dynamically decrease number of the partitions dynamically handle skes dynamically detect and propagate empty relationships and we are in the optimization notebook we are going to see inactions three main optimization mechanism in Apaches Park architecture Catalyst Optimizer and adaptive query execution and a tank stand project not all the details but enough so we can understand the concept so starting with Catalyst Optimizer we have spoken about Catalyst Optimizer quite a bit already and we know about its lazy evaluation feature and we saw it in action here we have another feature in action which
is called predictive push down so already here before executing the query aart as spark has figured out that it would be way better for everybody if we could read from the storage only the data about the fair amount because nothing else is being used and the example of how this is happening it's over here Apache spark is doing this predictive push down by trying trying to read from our files only the fair amount colum just in this case we are keeping a data in the CSV CSV is row based format therefore there is no really
option to WR specific column that's why it's way better to keep a data in parket or in Delta format which is fairly the same both of them are column based types and in such a case apach spark could actually read only the specific column which he would need for the computation like over here moving forward adaptive query execution so let's at the beginning turn it off and we can turn it off just for the purpose of the demonstration in the production there is no purpose of doing that by setting up the variable spark SQL adaptive
enable to false and if we run our query this is what we are going to get and I'm curious if you are going to Spot the Difference because we were running that query already before so we got two jobs 18 and 19 let's see how those looks like so job 18 and job 19 so it's over here the one which took the most of the time was job number 19 and seeing the details so for the first time we can see two stages in the job connected with each other stage number 23 and stage number
24 so we are scanning the file having the whole stage code again which is the results of the tanken project we will talk about it in a second then we have the exchange and we are reading the data and doing another computation and if I scroll down I see a details for both those stages So Below we have stage number 23 and we see that we have read around 11 GB of data then we see that we have a shuffle right so we've wrote to the disk around 1 Megabyte of the data and then a
second stage starts and we have read also around 1 Megabyte of data so we've wrote it and we read it in the next stage what is interesting is that we have 200 tasks over here so going into the details of stage number 24 and seeing 200 tasks so and that's how it looks like and we have all all colors of the rainbow and those are our 200 tasks so actually the reason for having 200 task is because the data by default are being partitions into 200 partitions when they are shuffled and without the Adaptive quer
execution there is no mechanism in place to consolidate those and to request apach spark to minimize the volume of it and if we see on how the tasks spend their time is we have here orange and orange orange is Shuffle with time here we have a green we already saw green before and we have some blue schedular delay we have some orange and the Orange is Task the serialization time so you see what's happening without adaptive query execution mechanism turn on which is pretty recent and if we scroll down we see the all the details
for all those tasks we can also go to the SQL data frame Tab and find the same job number 19 which is over here and let's check how this looks like here so what is different is we don't have anymore a block which was representing action taken by the Adaptive quer execution therefore we are doing the exchange we are partitioning the data into 200 partitions but there is again no mechanism to decrease the number of the partitions if we take as the comparison the previous query where the Adaptive query execution was on like this one
we are going to see that we had the exchange we had 200 partitions over over here but we had adaptive quer execution Shuffle read which collapsed that to into the four partitions so that's one of the big difference another difference will be visible once we will turn it on so spark SQL adaptive enabled through and run exactly the same query one more time you can probably already see a next difference here we were having two jobs 18 and 19 and here we are having free jobs the reason being is that adaptive query execution as the
name suggest it's something what is being used by Apache spark during the run time which it means that it's not anymore just run like Catalyst Optimizer at the beginning of the query but it's being run between the jobs and it can trigger Catalyst Optimizer one more time or collapse the number of the partitions and that happens over here as we can already see there is no stage including 200 partitions and checking job 20 21 and 22 so let's get back to the SQL and the data frame TP 2122 that's the one we can see exactly
the same schema just this time with collapsing the partitions from 200s into one in our cases we are not using the massive volume of the data so the difference in the performance may not be so Vy but on the production if you try and turn it off you will quickly appreciate it okay let's now talk about the tanken project so tanken as we spoken is way closer to the bare metal purpose of the tanken project is to introduce a set of optimizations into Apache spark architecture related with the memory usage optimizations or a CPU usage
one of the example of the tank and in action we will see if we will turn it off and we can turn it off or different parts of it we can turn it off and here we are turning off the optimization which is about generating optimized code for each and every tasks so if we run exactly the same query without tanken project turn on and we go and check job 23 24 and 25 so let's open this one more time and jobs 23 24 and 25 and as usual taking the one which took the most
of the time what we are going to see we don't have here any more whole stage code again instead we have hash aggregate if we compare it with previous jobs like this one then we can remember that always previously we were having the whole stage code it means that tanken project when it's on it generate optimized code for each and every task on the Fly instead of using the generic functions like over here but again the benefits of all those free optimization mechanism in Apaches Park architecture we will appreciate only once our data will get
big and we are at the end of introduction to Apaches Park architecture congratulation we have learned a lot of New Concept and some of them were not really easy the most important step is to to practice every time you write a query just try to open spark UI and try to figure out what's happening under the hood meantime don't forget to subscribe to the channel because there is a lot of new cool content coming shortly see you soon