Realtime Data Streaming | End To End Data Engineering Project

252.98k views10883 WordsCopy TextShare
CodeWithYu
In this video, you will be building a real-time data streaming pipeline, covering each phase from da...
Video Transcript:
thank you [Music] hello and welcome back to the channel today's video is for people that are passionate about data engineering if you are looking to build scalable and robust system then today's video is just for you before we continue don't forget to subscribe to the channel and ring the notification Bell so you don't miss out on any future video in today's episode I'll be walking you through an end-to-end data engineering project that stitches together several essential Technologies like Apache airflow Apache zookeeper Kafka Cassandra and postgres SQL all containerized with Docker so let's Dive Right In
and get started let's take a look at the eye level view of the system architecture we start by having an API which is the random user generator API this is used for generating random user data cells like lorem ipsum but this is basically for people all this data is not surreal and these are not real people this is how to use the API and this is the results that we're expecting from the API itself so going back to the architecture we are going to have a dag in Apache airflow which is fetching data from this
API intermittently the data fetch is going to be streamed into Kafka Q by the way Apache airflow with our configuration is going to be running on postgres SQL now the data is that we get from this API is going to be streamed into Kafka which is sitting on Apache zookeeper and the Zookeeper is the manager to manage all the multiple Brokers that we have on Kafka so if we have multiple Brokers let's say we have five or three forecasts for Kafka broadcast zookeeper is going to be the manager so when one goes down another one
replaces the dollar is restarts the process itself then the data inside the Kafka broadcast will be visualized in our control center the control center serves as a UI where we can see what is going on in our Kafka broadcast the number of messages that are coming into different topics and the different topics on the Kafka Cube while the schema registry it provides us a seven layer for the metadata the schema registry is a restful interface for storing and retrieving average schema which is particularly useful when the Kafka streams uh is being visualized so we can
understand the schema of the records all the data that is coming into Kafka so the data that we get from Kafka will be streamed with um Apache spot we have a master worker architecture being set up on Apache Spark when a job is submitted to the master the master decides which of the worker takes up the job and run the data run the task once task is run the task in this case will be to stream data into Cassandra so we have a listener that is going to be getting data from Kafka into spark then
streamed directly into Cassandra all this architecture are running on Docker containers which you are going to have a single Docker compose file that helps us to spin up all these architecture that's the basic architecture we have right now and we can dive right into the first level getting data from the random user API without a dag I'm going to start by creating a new project called Data engineering I'm using python 3.11 so you can use any python I think the minimum version that may be supported should be 3.9 so but in my case I'll be
using 30.11 in this environment we have a main.py which we can we're going to be using this stamina a lot so it's best to ensure that our terminal is properly set up I'm going to start by deactivating the current Source then I'm going to Source it again I'm going to increase the size of the UI so it's visible to everybody so I'm going to have a my editor I'm going to have a font I'm going to increase it to 20. okay all right with this if I do python main.py I python good so that means
the environment is properly set up and if I check which python is in my let's see the right directory good all right I don't need to do something py so I'm going to delete it I'm going to create a new folder I'll call the folder Doug in because we've activated our BMV so we need to install the base package that we're going to be using keep install I'm going to install Apache airflow now this is going to initialize the airflow so we're going to start from this guy connected to this API level and we're going
to spin up this instance in a little bit now that the installation is done I'm going to be I'm going to be creating a new file in our dag and I'll call it Kafka stream this file is going to be where we are going to be running the the dark from so I'm going to be importing some packages from airflow and some internal packages so from date time import date time then from airflow I'm going to be importing dark all right the other thing that I'll be importing from airflow is the python operator that I'll
be using to fetch this data operators.python import python operator now I need a default argument which is going to be my default ax that I'll be using to attach to my dag itself so to know who owns the project and some properties that I'll be using so the owner is going to be going to just do a scholar then it starts date is going to be the start date is going to be date time that I just imported and that would be 2023 2023 uh nine three and uh I could just use 10 o'clock it
doesn't really matter what time it says all right so this is uh 2023 the August 39 September 3 and then 10 o'clock all right now that I have my default AG so I'm going to create my dag which is going to be where uh where it's going to serve as as an entry point so I'm going to have with dag which is my dag I'm going to call the task ID uh the ID is going to be user automation I'll call it user automation all right and my default ads default ads is going to be
default ads the scheduled interfer is going to be daily uh call this art daily and there is no need for catch up so I'll just put catch up close so this is going to be running as our dag I'm going to minimize this for now then we are going to have a python operator I'm going to call this streaming task okay it's going to be a python operator and I'll call the task ID stream data from API okay then with python operator we need the python callable which is going to be the function that we're
going to be calling so I'll call this stream data function so we don't have it so we need to create a function so I'll call this the Stream data I'm going to be importing Json because this is going to be what I'll be formatting the response like us so what I need to do is I I need to import request to get the data from uh request to get your data from the API itself so I'm going to have a request .get I'm going to go into the API in here I'll just copy this URL
I'll paste it in here as my API there's no other parameters to be set so it's fine so what I'll do is I'll go back in here my request is going to be uh saved in response so I'm going to have press.json so I'm going to have stream data at this point so we can test run this before running the the dug itself so rest.json I'm going to print in this okay so if I run this by the time you know I'm going to clear this up and I do python docs and I call this
Kafka string so it should get me the data from the API uh yeah I'm going to comment this out oh no yeah I'll just comment that okay and I'll run this again okay good so I have access to the results from the API and as expected we have the result which is coming in an array of data and the first record at this point is this guy the second part is the info so we only need access to this to the results and this is results and info we don't need this info for now we
need this result and we only need the access to the first record which is this Json the large Json that we have so to put that in context we are going to be getting the response somewhere here and I'm going to put that press because to your breasts I'm going to get the results part of the data and I'll get the first index which is this guy from here the first index from that so if I print this I'm going to put I'm going to print this okay and let's run this again I have the
gender and I think we can still format this uh nicely can't we I think we can do json.toms we just dumped the J at the response and then we indent it as a maybe three if we do that we'll run this again I should run this again we shall have something pretty nicely laid out for us so we have the gender which is the mail the location email login ID and all that which is good I think this is a good step forward before we continue to ensure that the each of the task is fine
all right so I have the stream data but this is not exactly what I want so what I want is I'm going to separate this a little bit so I have get data get data so I'm going to move all this guy into this function and I'm going to return at this point I'm going to return response because this is what I need I just need to get the API data format it in Json that's the right I get the response in Json format and I get the results and the first index of that result
that's what I'm doing with the get data function meanwhile if I come back in here I need to put this in in a in a specific format okay because that's what I want to put on my Kafka queue right I need to have a format data where I'm going to be formatting this particular data that is coming in so I'm going to have format data so I'm going to get the response that I get from the API into this function so I'm going to have a single I have a variable called function this object this
is where I'm going to be holding all these data okay yeah it's a flute so I can put this somewhere here I'll just uh put it on the side and I should be able to see all the data and won't go like that yeah so if I have this I have the gender as a Mill and so what I need to do I need to extract this data so what I what I want to do is get the I'll start with the the uh the first name so I'm going to have the data first name
which is going to be coming from the name Let's see we have the name which is the first and the last name right so what I want to do is I want to get a name and I want to get the first name press because to format data and I'll put in press and let's see if I I have the right results if I run this again I should have yeah it says picture yeah instead of date I should be rest of course that's a typo now we have a more streamlined data which is more
of what we need we have the first name for this person and if you look at this data that we have yeah if you look at the data this is the picture of the guy that we have which is fine that's what we need as of this moment so the next thing we need to do is we need to add Kafka teacher to this well before we do that we need to set up our Kafka this is just a this is just a way for us to get access to this API so the next thing
we are going to do these are going to set up Apache airflow on the docker container right now I just import installed Apache airflow it has not been doing anything on my system so we need to set up a Docker compose file that is going to be initializing this Apache air flow and Kafka the schema registry and the rest okay now so what we need to do is we need to in our files here I'll just put this back where it's supposed to be and I'll just minimize this so I'll just come in here in
our root directory I'll create a new file I'll call it Docker file Docker compose because that's what we want we want to we we we don't need a Docker file really what we need is a Docker compose so we can just spin up and spin down at any point in time we want so which is fine so I'm going to start with the Zookeeper so we can just put in this architecture pretty nicely so it looks like the dependencies from Apaches zookeeper down to the Kafka Kafka is Con connected to the control center then the
schema registry and this guy is stand alone because it's not connected to the Confluence architecture we're going to be using the golf plate systems here then we're going to have a separate system which is going to be discard the Apache spark and a separate system for Cassandra so so let's see how that is going to happen now this should prepare a zookeeper ready for operation so this is basic stuff and if you need to do some more detailed configuration of the zoo keypad so you maybe you have some special parameters that you want to to
look into I think it may be best to consult the documentation so on the different environment variables and our best to tweak this configurations but but for now this should do for us in our in our case so we're going to have a broker and I'll call the image name this is going to come from Confluence Inc also and this is going to be CP server so this is going to be a broker uh if you check the docker up you should see so right now we just ticked off Apache zookeeper so the next one
we are working on is this Kafka the server so that passes for uh uh Brew guide so so we need to go now for the schema registry and uh I think for the dependence the only dependency we have left is the control center but technically the control center is dependent on the schema registry because the average schemas that this the average schema that the schema registry allows the Kafka to visualize on the UI it's going to be it's going to be a dependencies on the schema registry so technically this control center is listening for events
on schema registry to visualize the data directly on Kafka which is managed by zookeeper so that's the dependency Dynamics anyways so we just need to add the control center so we don't have any engines here but if you look at the images I already have some images installed already I already downloaded the the images from from the confluence Docker help so all I need to do is do Docker compose up and I'll do in the dash mode so this is going to create so for some if this is your first time without these images what
it's going to do is going to pull those images down to your system but because I already done that so that's why you're not seeing the pulling so it's I already have the images and it's going to just spin up the containers from those images directly so now this is what I was talking about the other time Suzuki bikes is only when it was LD that the broker started and you can see these guys are still created waiting for the broker to be held before it continues if you look at the UI they are not
they are not started now that this guy is uh is running is because the broadcast says tit is ready to accept connection so yeah so that's the dependency level until one is done before the other uh continue so if there's any error at this point we we need to check to be sure that the environment is ready and ready to accept connection before we continue so we can fix any error and uh continue so we have the schema registry I think it's going to take the most time because it's uh I think it's 30 seconds
because you know yeah it's 30 seconds so it's going to be checking every 30 seconds and the timeout is 10 seconds maybe I should have reduced that but right now this uh let's see and these are warnings we should be fine we're waiting for yeah the schema registry is okay now and the control center is coming up so which is okay uh the control center also is started which is fine I think at this point uh we are done with this part we only need to confirm on the UI that the environment is up and
running and that is ready to accept connections so what we need to do if you just click on this guy or go to localhost 9021 I think we are a little bit early so let's check yeah the control center is still starting off we just wait till it's done he says started Network server and this is ready okay so if we refresh this page we should be good now this is our control center so the importance of those connections that we did uh is to be sure that we have the right Brokers and we can
visualize the uh the data right now we don't need any connections to K SQL DB or connect which is fine now we may do that in the next in a different tutorial but not in this tutorial we're good with classical DB and the rest but let's focus on the broker which is the most important thing here and there we can see the topics and the production and consumption so if you go in here we have the UI the broker the topics the connect we don't have anything there and if you do cut the Brokers these
are the this is what has been produced in the last in the last few seconds which is 24 kilobytes and 24 bytes per second and the rest so this is uh and the Zookeeper is connected you can see the Zookeeper is connected um self balancing is on yeah and the rest is fine so we only have we have a good broker name in here the Bison bites out and the rest and these are the other properties so what we really uh want is this topic as of this moment we don't have any topics created yet
so we don't need to create anything we we do that automatically by the time we start publishing data into it so let's continue so that's the first uh this the second part anyways so if you connect to on our Kafka stream if you go back to a Kafka stream at this point which is what we have so we need to connect to the Kafka queue so we can see we just maybe publish one or two data into the Kafka queue and we'll see if it is working or not so we need to install a package
for that which is going to be peep install Kafka python by the time you get this data from the random usage of me API we format the data right which is here and now we are dumping the data so we don't need to dump it for now we just uh go ahead and publish this data so we come in here I'm going to import Kafka I mean from Kafka producer uh and I think I need to import the time because what we want to do is we want to be producing to how to set the
number of time then we start producing so uh let's see uh I just want to send a single data to the producer so let's initialize the producer at this point so we have a producer to be Kafka producer the bootstrap server bootstrap servers it's going to be we are connecting to the broker on 2902 however because we are not yet on the on the docker containers of this time we need to use the uh the external IP address which is the localhost and Report 9092 uh I think we should set the max timeout the max
block Ms which is the timeout it's going to be 55 seconds all right okay so let's publish and push the data that we get from this guy we push it to the queue so we have producer dot send we have users created and we do a json.doms we just dump the data and then code it in UTF utf-8 encoding ah not data Express yeah okay I think this should get us uh data to the queue let's see I'm going to do a python Kafka stream dot Pi so let's see if it is able to push
data to the queue are we able to connect DQ it says let's see connecting to this guy proving this connecting connection complete so we are able to connect and push data so we come back into the car into the topic and we refresh excellent so we have a users created and we have data on the keyboard right now because we're not listening to this we can't see anything so if we run this again if I run this again we should see new messages coming in let's see good so this is the data that is coming
on task Creek guy the first name is Wyatt Willis on task Creek Australia and the rest if you look at this guy uh we didn't print it out anyways but that's the data that is coming in so we are able to push the data to the queue and that's a good win for us so the next thing for is to sell so we have we have the we have the Zookeeper connected to Kafka successfully we're able to use the schema registry Avro schema to visualize the data on the topic which is fine so this part
of the system is good now we need to push data from airflow to postgres I mean from airflow to Kafka not postgres so what we need to do now is initialize our workflow our airflow with progress server so we just come back in here to reduce the the workload ah so we come back to our python okay in here we converted Docker compose just minimize this for now uh right now we need to add our we need to add our web server and that's it so the final thing we need to do before we spin
this up is to add our scripts for the entry point so we need to have a directory called a script in our script we're going to have entry points dot sh and in this entry point.sh what we want to do is we want to write the sequence of command that airflow should follow when it's trying to initialize the web server or the scheduler itself so we need to have a Bim Bash this is our entry point.sh so we need to add that to our startup volume which is going to be inside the script entry point
dot sh to be synchronized into opt F flow is it Scripts entry point dot sh and the entry point file is going to be pointing to opt airflow script entry point it's not script see script entry point sh yeah and uh because we are connecting to postgres we need to set up our postgres because right now what we are doing basically is to uh alikemi is alchemy yeah Skill Academy and I think this is usually double underscore airflow the one that's called that yeah what am I missing uh sequential yeah I think this should be
fine and the last part is going to be the is going to be the postgres which is uh where is it yeah yeah I need to just initialize the postgres which is uh postgres so what is on the same network one other thing that is left is the scheduler which is not initialized right now so what we need to do is to write a simple script for this scheduler we are going to have a scheduler so let's see if there's any error so we can quickly debug and continue our coding the web server is running
and let's see if it is working as expected okay good uh it loaded the the requirement requirements txt and it's running it at this point which is good I think this this shows that uh environment is properly set up and the data is running as a specter good so uh web server is running at 8080 because the web server is running I think what we said in our Docker compose was to be checking every every 30 seconds yeah he's going to be checking every 30 seconds so even though it's ready and we just need to
wait for like 30 seconds before this guy picks up and while that is while that is running I think we can go into our localhost and go into 1880 and see if our airflow is properly set up so airflow is running and we have a admin which is our admin admin the sign in we should be able to so yes good the airflow is uh I mean is running as expected but there is no scheduler even though we are using a sequential executor which is not advisable for production because it's going to be running your
task one after the other if you want to run tasks in parallel and uh in a more scalable way uh if you don't use this you should use something like a salary execute or something like that so what is happening websitis on LV just fix this and the scheduler should be up right the scheduler is also up and it's uh started so it's doing the same installation because we need to install airflow on the scheduler too so let's see if it is also going to be running as expected so while this is uh the installation
is going on so we don't waste too much time we just proceed to our Kafka stream and we we fine-tune this and we can set up our dag so we can start seeing some stuff and movement on the UI so uh come back in here and minimize this uh so I have this guy and I just uncomment this I I don't need the stream data I know it's working I can see some data pushed to the queue and this is my data at this point yeah however we need to fine-tune this now that this guy
will be running on the on the docker instance so we need to change this these are broker and then it's going to be uh broke at 2902. to use the internal IP address hmm I'm going to take a quick pause for the scheduler to be done and then I'll resume once it is done now that the schedule is has done is done initializing I think the next thing for us to do is to check uh the UI at this point so let's see if the UI is going to be updated so we just need to
refresh now the error the warning message is gone and we can see our user automation doc that's a good one so now that we have our user automation Dag you can see if the the grid is initialized properly good so we have stream data from API which is good so the next thing we want to do is we want to switch the um we want to make sure that we are able to stream data directly from the random user API directly into Kafka as much as possible so instead of just one producing that we're doing
we want to be producing all the data that are going to be sent from random user into Kafka directly so we just need to modify this particular this particular script this function will be updated so you have to get data from our data we're sending data to Kafka at this point now so instead of just this part we need to remove the Kafka producer maybe oh in here and then we just do oh well here so we have we've imported time so what we want to do is we want to finish that part which is
if time time we need to get the current Style so we just uh this this every time so what we're going to do is we want to stream for like two minutes or even five minutes or six one minute I think we're going to be producing rapidly so what we want to do is we want to get the current time we just break this Loop otherwise if it is not better than one minute what you want to do is once you get the data from here so we want to move this guy from here to
this place so we want to get the data format it and then maybe we just producer we already have a script down here so we just move this so so what we are doing here is while the time is between one minute so while we are producing within uh from 0 to 60 Seconds what I wanted is we want to as many times as possible send requests to random user.me API get the data format that data and send it to me to the queue itself so we just have a exception sorry accept uh exception as
e and then we just log that we just import login so we just uh log that part just log in if I can only spell login and arrow chord and the error is the error is e okay and what we want to do is we want to continue so even if maybe there is a network downtime for like a two or two seconds or 30 seconds it doesn't matter we just logged that particular arrow and continue the loop and once it is one minute we break that's what we want to do so that's uh that's
that with this uh adjustment at this point so what we want to do is we want to go back to the UI refresh and Trigger the dag from the UI yeah it's done so what we can do is we trigger this we just turn it on yeah okay so it's not triggering automatically because we didn't enable catch up so I just turn it on I just triggered it and there's just three I use automation should start any moments now do we have anything in the logs I don't think so so but let's let's follow the
production on the the Kafka you are I'm going to just refresh this page okay so we are listening for new events on the on the UI just waiting for this to get triggered okay so it's running now once you start seeing data drop good so data is dropping on the UI and it's going to keep dropping for at least one minute we can trigger the the list from here and you can see the the offset number and the data that are coming in anyways good so while this is going on so that means uh at
this point all our data is fine so we want to write the if you will check our UI so we've taken care of this part and we've taken care of the architecture so we want to set up the spark architecture and the Cassandra which are the other parts that is left that are left so we have the Masterwork architecture uh for this uh for the purpose of this session I will just use one uh Walker and I'll show you how to add the other workers if you want to add them and then we'll go from
there we just go back to our Docker compost in here I'll just copy paste the Masterwork architecture and I'll just talk through them so here is the master worker architecture and what I'll do is I'll just quickly talk through them so this spark Master is coming from the sparkly test so the command to start this master is to go into the bin spark class and deploy the master so it's going to expose this on the part of 1990 and there's going to have another Port of 1777 so this is where the master will be communicate
with the workers themselves so if you have multiple workers they'll be communicating on 70 77 70 77 with the master while if you want to access the you I will use a port 1990 of course the they have to be on the same network so they can work correctly for the worker so if you have multiple worker we just replicate this okay we replicate this as one and then we call let me show you so if I copy this I'll just change the name from spark worker I'll just add maybe spark Walker 2 somewhere something
like this and they have the same uh configuration but the dependency that it they have against the master is going to be the same same spot class but instead of the master uh it's going to be instead of the master here all of them are going to have workers so if you have more than one worker you can just um have a the same class for them but communicating with the master on the same port uh the the same container name and the port so yeah that's how to add multiple workers but for the sake
of the speed I'm going to just leave it at one and also these are the environment variable for the um for the worker so we need to have two calls and a minimum of one gig so if you reduce this one gig to maybe something like 500 you are going to have an error usually you have a minimum of one gig for the workout because they are doing the most of the work and yeah so I tested this with something less than one gig and I was having an error so it's I think it's best
to have a minimum of one gig so you can also check the documentation for more information on that yeah now for the Cassandra DB we have a we're going to be using the latest the Cassandra so the container name and the hostname are the same we're exposing that on 94.2 the maximum if size is 512 and the rest so the username and password is Cassandra Cassandra so this is going to create amount of volume locally called Cassandra because of the host name it's going to mount it and this if you want to get any data
into Cassandra you can do that through that point it's not necessary we don't need any volume really and that's it they have to be running on the same network so if we do a Docker compose up now we should have uh composed of detach we should have he says must be a strings postgres Network must be a string let's see services postgres Services yeah we don't need this just added when I mistakenly press the entire the other time all right so Cassandra is running and if you look at this we have a if you do
Docker Pierce we have we have the Zookeeper which schema registry Let's see we have the scheduler yeah we have the spark Walker which is started and we have Cassandra the spark master yeah so the spark master and the worker I hope now 20 data engineering I'm going to create a new file I'll call it spark stream Dot py all right but if spark stream.py this is where we're going to be writing our our code for for spark streaming now we have all the other dependencies working so what we will need to do is we want
we need to install the Cassandra driver so um Pi spark as well so we do a peep click install Cassandra driver so we can communicate with Cassandra and then we just need to do with our installation we need to do is this spark and Pi Spark so we need to install past spark and Pi spark uh dependencies wow Pi spark and Spark are installing I'm going to continue coding while we wait for the installation to be done all right so now that all of the dependencies are done are done I think we can continue SQL
so we are importing spark session and we from this same package we are going to be importing some functions we're going to be importing are from Json type color so unless we start with those ones for now so what we want to do is we want to create a key space on Cassandra before we do anything because the key space is like a schema if for people that knows uh SQL postgres and the rest just like your public you need to have a key space and then in each key space you have multiple tables all
right so we are doing a create key space is going to be done with the session I'll just put a pause in here so we we create a key space here but we need to create a connection so the dev create a table with a session so we are going to be creating a table here and then we need to uh insert the data that we we are fetching from Kafka inside data so we are going to be getting the session of course and then quarks which is a then we do the insertion here and
then we establish we need to establish connection with the uh Kafka spark and Cassandra so we need to uh uh so let's say create spark connection here and then we are going to be creating spark connection create uh Cassandra connection okay and then we'll be creating Cassandra connection all right so let's start implementing this and then we work our way up so once we have a connection to Cassandra from there we create a connection to spark and then we create the key space the table and the rest so let's start with the main function so
if the name equals to mean all right so we are going to have a spark connection which is going to be uh create spark connection we're going to write the implementation shortly and then in the create spark connection so what we want to do is we want to create an we want to establish connection with the spark which is going to be a try we're going to have a let's say screen it's going to be spark session so we build we we connect the Builder and then instead of just get get or create I think
we're still going to need this a little bit and then we're going to have the application name so I'll say spark data streaming and you can have two dots in here can we uh we just have the config this is where things gets very interesting so with spark we need to specify the Java file that will be used so I'm going to be using some packages in here so two jars will be used to one it will be used to create connection with spark the other one with Cassandra and Kafka another so we have com.data
data stacks dot Spark and the spark Cassandra connector however if you want to get access to them I think we can come into Maven Repository and we can search for a spark this is moving Repository maybe repository we can have a Cassandra connector okay I think we need to have spark Cassandra connector so from Com data Stacks we have a connector from here so if you don't have it I think you might need to download so the latest one is 3.41 which I think is what we'll be using so it's just the latest release so
what we need to do is we need to reference this correctly we have spark data connector we need to get the scalar version and then we we get the the version that we'll be using all right so going back to the code we are going to connect with the uh spark Cassandra connector uh the 2.13 version 3.41 and the other part which is going to be oh the other one which is going to be the SQL Kafka SQL Kafka connector secure Kaka yeah for this one we have a this is for structure swim streaming okay
so we have a 3.41 2.132 so if you look at this guy we have access to it and you can download the Java file here include it in your price Parker uh Repository in the list of just there and you should have access to this so um this is the module which is a spark SQL kafka010213 all right so going back to the code here I'm going to have a bug dot Apache dot spark and I'm going to have spark SQL Kafka zero one zero underscore 2.13 3.41 okay 2.13 0 1 0. okay 2.13341 yeah
so this is the those are these are the two packages will be uh two Java files will be needed okay so the next one is uh we just need to configure the the host name for Cassandra which is going to be Spark dot Cassandra theconnection.host and we are going to be putting that as localhost all right and and that's it this should establish a connection for us because we are going to be running this on Docker so this is going to be a broker but if you want to test this locally we do a localhost
for now all right so this should give us access to to spark all right and then we just do accept exception as a e we just log the arrow we couldn't uh because I just add couldn't create the spark connection maybe session due to the exception okay and just pass the the errors argument there so with this we should be able to establish a connection all right and at this point we just do uh s con dot spark context that set log level to be I think I'll just do error plug in the info Spark
connection created successfully foreign so basically this is going to create a spark connection for us so again to go to what we've just done we are creating a spark session with this name all right and then we are using these Java packages in this uh connection which is going to be the 2.13 for this black Cassandra connector and the other part is going to be spark SQL Kafka so this creates a connection for us on the yeah okay because to none so this creates a connection for us so the next thing we want to do
is we want to establish connection to to Cassandra so now in here where we are establishing once we've established a spark connection the next thing for us is to establish a connection to Cassandra so we do that with cluster so we just do cluster I think we can have a cluster in here we just which has been imported uh we connect to localhost Cluster all right and this is going to be the uh yeah connecting to the cluster to Cassandra cluster all right we're going to have a session being created from the cluster we have
cluster.connect so this is going to generate a session for us and with this session we can return session uh I think we should do a try catch here accept exception as e and log in the arrow could not create Cassandra connection due to e yeah I'm going to do the session here question on and yeah so once we do that if our session if sparkcon is not known then we continue okay then if that is known I think if it is not known then we establish a connection to uh Cassandra connection at this point so
we just do uh this is going to be uh uh session okay I'm going to move this up here I just don't know okay all right uh this is shadowing the name outside uh just call this uh Cassandra session okay so if we have a connection we return it when if not we return this so so if session is not it's not I think we just break the connection we just return and there's no need to continue at this point I think we just do nothing uh oh instead of that we just do if it
is not known so we only enter that if this is not known so what we want to do is we want to create a key space okay with the session that we have and then we want to create a table with our session for the key space we just come in here we do a session dot execute so even if we run this multiple times because we added this command if it if it does not exist it only runs once so we do the same for table now to insert the data into uh into the
queue into Cassandra we are going to extract the data from the quarks all right so this gives us access to uh the data from there and then we can try to insert it now which is going to be session dot execute then the last bit is uh the only thing we need to do is we need to connect to Kafka so we can extract those information from Kafka so the last Parts I think is going to be uh depth I think we need to connect to to Kafka so we we are using the spark connection
so we are going to have a spark DF so the spark basically what we're doing here is we are connecting to this pack uh by using spark connection to read data from Kafka on this uh on this server with uh the users created topic and we're starting from the beginning load it and we are returning that data frame from there so I think that's what we're doing at that point uh let's see what's the yeah of course I think it's just the this person yeah I think that's the last person issues um yeah I think
that's pretty much it so when we create the Cassandra connection yeah uh yeah that's all so I think we can test this out uh once we connected Kafka so we need to get into that frame from there so when we connected to spark here we need to get the data frame so I'm going to call this uh DF and connect to Kafka which is spark connection so connect to Kafka with spark connection this is where we are creating create spark connection all right and then this is uh pretty much descriptive and we create a key
space click the table so instead of inserting directly I think we need to write do a stream so we're not doing it just once we are doing it uh in stream so we just have a streaming query at this point which is going to be I I think we need to select the data and do selection DF it's going to be a I think you need to have a function because once we have this data frame we need to structure it in a way to insert into Cassandra okay and the way to do that is
to have a struct fields which is going to be create a selection data frame from Kafka okay and then we have a spark data frame so this is going to we need the schema at this point it's going to have a extra type construct type so what we're doing basically is to create a schema so when we read data from the spark data frame we are converting whatever we read into this particular schema and then we are selecting data from it we are selecting everything from that column so if you have let's say hundred one
thousand one million records whatever it's going to select all of them from the Kafka queue right once it selects them even if it is just one record it selects them and format it in this particular schema and select the data from it to be inserted so we just need to do a selection data frame which is going to be uh create selection data frame from Kafka and then we're using the spark data frame which is the DF that we're getting from here yeah we're using that and uh yeah so we just need to get our
selection data frame and do a right stream from that in the format of where we are writing to bug dot Apache Dot spark.sql.cassandra option to check we need to have a checkpoint location this is not necessary this is not important uh it's not compulsory but it's just to have a checkpoint in case of failure and I'll just have a better option it can have a key space option key space is going to be spark strings because you just put everything in it yeah so we don't have to have just have option the table is gonna
be created users and the last thing we need to do is start this streaming yeah and that's all we need to do um so we can just do streaming but uh weights termination there are some examples here uh this one by two spark and you can see everything that I wrote in there is somewhere here so if you have multiple Brokers you do something like that yeah so everything is like that so you can just uh maybe come in here do a copy paste and all that but yeah that's this is where I got most
of the code from and I'm just uh replicating that the on the code yeah so let's let's pin back our uh Docker containers and then detach them [Music] I think we we have an arrow somewhere here hello yeah it's a coma in your arrows can I see one of that red guys just uh this is fine this is a public uh the key space name so it's fine I couldn't recognize it so it's okay yeah I think while we wait for that to come up and that's all we need to do right now that the
installation is done our web server is back up and running and uh I should lies back it's also up and running let's see Chevrolet is also listening uh also we you need to see why our control center has gone down I think we just need to start it up there's really no need uh we don't need the control center at this point because we know that our data will always be produced and we don't need to visualize them anymore even we don't need the schema registry anymore because that's connected to the control center so we
only need the the the Kafka which is uh our broker to go and uh that we're able to connect to it so let's see if our web server is up and running I guess it is we just do admin admin to sign in and uh we just go in here to see the dog and we have it stream data from API good so what we need to do is we we need to check our Cassandra Eva Cassandra is open running but we need to do a run of this spark stream to see if we able
to even connect to to spark stream at all so it's going to run at the entry points yeah and try to establish a connection okay so I think with this dependencies because I don't have them before these two guys uh I think it should download them so he says the artifact does not exist on risk of dependencies you couldn't see Cassandra connector 213 14 let's see Cassandra connector so let's go into moving Maven repository spark Cassandra connector two one three three four one oh it's 3.4.1 yeah because that's it uh we just need to rerun
this and I think yeah with this it's a let's see if it connects to it this is the server error mismatched in Beauty and the file so we need to establishing a connection uh trying to execute yeah I didn't close these uh I didn't close that uh this uh this bracket I think did I do the same for the key space yeah the key space is fine it's only the table so I need to just rerun that all right so we're able to create a key space let's see we're able to create a key space
I'm able to create a key uh a table so the only thing that is left is that uh is uh it says uh an error call while publishing this to do to data stacks.login so that there are two ways to this so we need to we need to it was starting the the stream so we need to go into uh because we are using this uh this uh jav files in here we need to add a dependency if you look at our VMV you know I've been I think you know I think in the lips
in the leaves and you check a buy spark we need to download those uh price you need to download those Java files in there if you look at the jars in here we need to find uh spark SQL and we we can't find them here okay so we need to add them to this I think I already did the download so I'm going to copy paste them and while I do that we can do uh we can go into the Cassandra and uh check so we we go into the Cassandra and then we have the
interactive terminal with the use this is determinant uh this is the container name now we're going into SQL sh with the username of Cassandra password of Cassandra the localhost is uh is the iPad is the IP and then this is the port so if you exit if you go into that we have access to the SQL sh and we can do a describe sparkstreams.created users and we have decoded users outside the uuid and the rest and these are the details of the data that we have so if we do a select stuff from spark streams
created users we have an ID address and the rest of the data which is good so we have access to to Cassandra data directly so all we need to do now is by the time we are streaming data from uh Kafka we need to push this data to Cassandra so the only dependencies that we we need to add are those Java files which you need to download for now I will just exit and do a Docker compost down to remove all these existing images then we continue from there already copy the Java files so what
I'm going to do I'm going to copy paste steps into the Jazz directory so we can have a dependencies on them it's asking me to click OK yeah I just copy them so the two Java files have been copied and you can see them there one is the sparkassandra connector and then the other one is pack SQL Kafka connector what we need to do now is start up the docker compose now we do Docker compose of detached we can start up the data card compose the containers are starting up and you can see that uh
our web server is having an error what's the arrow I suppose that might be the entry point but let's see uh web server if you click on that to scroll to the bottom and yeah I suspected uh command no found M so if we go back to entry point uh we have a an M when I'm trying to install upgrade the PIP now just delete that and try again I guess it's going to start the web server and then we can check again I just clear this yeah okay I think it's starting up so if
you go outside of here we need to stop the control center and uh the schema registry so once we stop the schema registry the control center is not going to be coming up because of the dependence anyways so the next thing we need to do is we need to go to the UI and check our Spark the status of our spark session so going back to the UI you go to localhost 1990 just refresh this yeah and you can see our spark Master the IP address which is the localhost really is just the IP address
of the docker content itself that is running it so we have the worker ID you have there's no running application no completed application and you can see that so we go back and then we submit a job our first job to that but before we do that I think we need to comment out the streaming part of the the spark system where we're streaming into Cassandra for now I'll just commit these two out yeah then we can try we want to see whether the key space on the table will be automatically created if we do
a Refresh on the UI yeah okay so we can see the ID the ID is uh and the name is practical streaming it's been running for 14 seconds so if you click on the the application you can see the worker that is running it and the worker ID the number of calls the memory and the rest so going back to the time now we can see the key space successfully created and table created successfully but is the connection yeah the connection it says it's saying an error what's going on let's look into the docker container
uh so I think we just need to do Docker exec it to see what is going on and I think the script is fine I'll just do Docker exec and check it and I'll do it select star from there is Spark yeah I think it's fine we just exceed that uh we'll resubmit the job and check again if it is fine or not okay I think it's coming back up yeah let's check can we see any error it says successfully started the service connecting to master yeah I think I think you successfully connected and if
you look at the data frame that we created which is the structure into spark which is fine and Cassandra is fine the data frame is okay and uh yeah I think we are good really right so the next thing we need to do is we need to submit a job and enable the streaming to Cassandra so if we refresh this check the UI and refresh I just delete this and go to the 1990 refresh this part I think we need to log in because we already spinning down from right okay we don't I'll click on
that okay let's start it trigger dog so it's coming out now that the key space and the table has been created we can comment this part and resubmit the job so I added a login info so we can see when the streaming is being started and you can see the key space and the table has been created added this part and then so what we're going to be doing is we're going to be submitting this job back to a budget Spark then simultaneously we're going to trigger the dark so we can start seeing the data
that has been produced the cap to Kafka queue and this is the last one that was running so I'm going to retrigger this so we have a second run and once that is done we we check we do a double check on the on Cassandra to see if we are getting streams of data into Cassandra well right now right now we need to get the data into Kafka first by triggering this dag which is currently running then we go from there into into Cassandra I'm going to open a new terminal well we doing an execution
into the terminal of Cassandra so I'm going to do a Docker exec Dash it into Cassandra and I'm going to be running the SQL statement that we're going to be using so this the producing is done when we trigger The Jug so the next thing we need to do is check uh I'll just do select star from there select stuff from that would be either spark streams dot uh created users yeah and you can see we have initially 59 rules if we go in there and do a select star from I'm going to just select
the first name and the last name so we can I think it's a little bit more so I'll just do select first name and last name from from spark stream to created users and you can see the username and password uh the first name and the last name of the users that have been streamed into Cassandra so basically that's how to do it and if we trigger more we see more records and uh yeah that kind of caps you though basically I think we've satisfied all the items on our architecture we have the API and
from API to Cassandra we started with the Apache airflow which is coming pulling the data from the API and once we trigger that process it streams data into Kafka then to Apache spark then into Cassandra and there you have it folks and end-to-end data engineering pipeline from data ingestion to processing and storage all containerized with Docker find this video useful don't forget to give it a thumbs up and subscribe for more content like this until next time happy engineering
Related Videos
Big Data Analytics with LLM (GPT4, GPT3.5turbo, GPT3.5) - OpenAI ChatGPT
28:52
Big Data Analytics with LLM (GPT4, GPT3.5t...
CodeWithYu
9,027 views
Apache Airflow One Shot- Building End To End ETL Pipeline Using AirFlow And Astro
51:35
Apache Airflow One Shot- Building End To E...
Krish Naik
53,196 views
Smart City End to End Realtime Data Engineering Project | Get Hired as an AWS Data Engineer
2:27:07
Smart City End to End Realtime Data Engine...
CodeWithYu
62,460 views
Fundamentals Of Data Engineering Masterclass
3:02:26
Fundamentals Of Data Engineering Masterclass
Darshil Parmar
170,794 views
Data Engineering Course for Beginners
3:03:43
Data Engineering Course for Beginners
freeCodeCamp.org
746,376 views
ETL pipeline using Kafka airflow spark docker
1:48:03
ETL pipeline using Kafka airflow spark docker
Bigdata Infotech
1,487 views
1.2 Billion Records Per Hour High Performance Kafka and Spark - End to End Data Engineering Project
2:17:32
1.2 Billion Records Per Hour High Performa...
CodeWithYu
8,689 views
Learn Microsoft Fabric Data Pipelines in 2025 - Full Course!
3:06:54
Learn Microsoft Fabric Data Pipelines in 2...
Aleksi Partanen Tech
6,750 views
PySpark Tutorial
1:49:02
PySpark Tutorial
freeCodeCamp.org
1,377,540 views
Apache Spark Architecture - EXPLAINED!
1:15:10
Apache Spark Architecture - EXPLAINED!
Databricks For Professionals
24,096 views
Google Cloud Realtime Streaming | End to End Data Engineering Project
1:28:23
Google Cloud Realtime Streaming | End to E...
CodeWithYu
13,032 views
Spring Boot Kafka Full Course in 3 Hours🔥🔥🔥
2:43:02
Spring Boot Kafka Full Course in 3 Hours🔥...
Java Guides
25,831 views
Apache Kafka Tutorial with Spring Boot Reactive & WebFlux | Kafka Tutorial
1:37:44
Apache Kafka Tutorial with Spring Boot Rea...
Bouali Ali
66,298 views
Code along - build an ELT Pipeline in 1 Hour (dbt, Snowflake, Airflow)
36:34
Code along - build an ELT Pipeline in 1 Ho...
jayzern
179,870 views
Apache Spark End-To-End Data Engineering Project | Apple Data Analysis
3:01:19
Apache Spark End-To-End Data Engineering P...
The Big Data Show
79,338 views
Building a High Performance Real-Time Analytics Database - End to End Data Engineering Project
1:10:52
Building a High Performance Real-Time Anal...
CodeWithYu
5,667 views
End to End Data Project with Microsoft Fabric - Data Engineering, Data Factory and Power BI
1:05:37
End to End Data Project with Microsoft Fab...
Pathfinder Analytics
42,949 views
How to build and automate a python ETL pipeline with airflow on AWS EC2 | Data Engineering Project
1:49:49
How to build and automate a python ETL pip...
tuplespectra
113,999 views
IPL Data Analysis | Apache Spark End-To-End Data Engineering Project
1:19:53
IPL Data Analysis | Apache Spark End-To-En...
Darshil Parmar
95,735 views
Building Realtime Data Warehouses from Scratch | End to End Data Engineering Project
2:08:49
Building Realtime Data Warehouses from Scr...
CodeWithYu
7,914 views
Copyright © 2025. Made with ♥ in London by YTScribe.com