Hello re:Invent, how's everybody doing? We are doing okay. Ever wonder how to run a massive ads business in the cloud? My name is Ish Singh. I'm a Senior Solutions Architect with AWS, and I work with Amazon Ads. Welcome to Under the hood at Amazon Ads. Today, we'll start with a short introduction to Amazon ads. Then we'll cover omnichannel marketing with Amazon ads, and then we'll talk about their pillars, challenges, and architectural evolution, followed by a deeper dive into ad campaigns and ad serving at scale with high availability and resiliency. To wrap things up, we'll share
how ads is improving ad selection and relevance with machine learning. And we'll be sharing learnings and best practices throughout. Amazon ads mission, core mission is to help build businesses and brands of all sizes all while creating ad experiences that are useful for customers. Anyone that wants to build their brand, connect with shoppers, grow their sales, measure results can really benefit from Amazon Ads Solutions. We are super excited for this talk because for the first time, Amazon ads will be sharing how they handle some of the most scaled advertising requirements in the world today. In this
talk, Amazon Ads leaders will walk through how we process petabytes of data score MAT, and show hundreds of millions of ads within milliseconds. We'll also be learning how we are optimizing for costs without sacrificing performance or reliability. Let's hear from Rachit Chawla on how Amazon ads is building and operating at this massive scale. (audience applauds) Thank you, Ish for the intro. Good afternoon, hello everyone. Hope you are having a good time at re:Invent. My name is Rachit Chawla. I'm a Director of Software Development at Amazon Ads, and I'm going to talk a little bit about
what Amazon ads offers as a products for advertisers to build an omnichannel marketing strategy. And we're gonna talk a little bit about our core services in ads pipeline, and our evolution, and our journey through that. So let's start with what do I mean by omnichannel strategy with Amazon Ads. A shopper or a customer could be anywhere in their purchase journey, and they could be going to this purchase journey across any channels, whether it's mobile, social media, desktop, streaming TV, or even an offline store, and at Amazon ads, we offer products that allow you to increase
the awareness of your products if customers are in the early phase of their purchase journey, or if you wanna increase the concentration of their purchase, we could do that if they're in a little bit later phase of their journey. So we do have products ranging to drive discoverability of your brands to sponsor a brand or discoverability of your products to sponsor products on Amazon. If you wanna do that on Amazon and off Amazon, you can do it through sponsored display as well. We also have programmatic buying through Amazon Demand Site Platform, Amazon DSP, where you
can do a 1P and 3P ads on various channels, video, audio ads, creative ads, all kinds of things on streaming TV, or Fire TV, or other media channels. So before we go a little bit deeper into... before we go deeper into our services, core services, we wanna talk a little bit about who are our customers. There are three sets of customers that we cater to primarily. First is advertisers. These are folks who use our Amazon ads console to build and manage their ad campaigns. Next up is shoppers to whom we show these ads and we
wanna show the most relevant ads to our shoppers, and thirdly, third-party developers, or also called partners who use our public APIs campaign management APIs, reporting APIs to build tools and services on top of this, which are used by various different advertisers, depending on where they are in their marketing journey. Next up, we're gonna talk a little bit about how Amazon ads run some of the most complex workloads ad tech workloads serving ads at a super high scale, hundreds of millions of ads processed per second. And also we will talk about core services throughout this talk
and our evolution of scale and learning to that. So let's start with a very high level big picture. This is a simplified version of what we do. On the left side, there is advertisers and third-party developers who are using our campaign management APIs to build and manage their campaigns. Next up, we have ad server who serve these ads, showing most element ads at a high scale to our shoppers. And as in when shoppers see these ads, we record events. These events could be impressions, clicks, or views. And we do that in a real-time streaming manner
so that our budgeting service could enforce that spends never exceed the budget that customers had planned. We then record all these ad events in a petabyte scale, high-scale data lake, and enable real time analytics and real time machine learning on top of that. These machine learning tools are super important to build sophisticated models as it's very critical to show most relevant ads to our shoppers. So let's go a little bit deeper into our campaign management set of services. As you can imagine, these are the set of APIs we use. We provide to our advertisers so
that they can build and manage their campaigns. In addition to using CRUD APIs create, read, update, delete APIs, we also had use case to support list search and query kind of APIs. And we needed to do that for tens of millions of campaign objects with low latency. We're talking about few hundreds of milliseconds read-write latencies. And we were able to use purpose-built data stores like DynamoDB, or Elasticsearch to be able to meet our use cases of crowd list query. We also were able to use Kafka to be able to do publishing pipeline of campaign metadata.
And so we were super happy to use Elasticsearch and get six nines or higher availability for our campaign services. Next up, I'm gonna talk a little bit about our highly available and scalable budgeting service. The primary use case of our budgeting service is to make sure you never result in over delivery because if spends are greater than budgets, then over delivery can happen. And that leads to bad customer experience, as well as a loss of millions of dollars for Amazon that can be collected. So our use case was to really build a real-time streaming architecture,
low propagation delay, high scale, a budgeting pipeline that can aggregate spends in a real-time manner and be used services like DynamoDB, DynamoDB Streams, Kinesis, and Lambda, to be able to build this high-scale budgeting aggregation pipeline. And our other use case here was to make sure it auto scales and auto scales in a hands-off manner. And using some of these AWS services, we were able to achieve this elasticity as well for peak event days like prime days. At the same time, we were able to reduce our costs as well as we move to this new architecture
along the way. Next up, I'm gonna talk a little bit about our high throughput ad serving. The primary use case of ad server as you all can imagine is to show the most element ads at a super high scale with a tight latency budgets. And we're talking hundreds of millions of ad objects to be processed per second in less than 120 milliseconds. And as you can imagine, this requires a lot of data to be pre-computed, lot of data to be cached, and a lot of machine learning models trained and scoring to be stored very close
to ad server so that we can process all these ad servers at such a hyperscale. And we were able to use ElastiCache Memcached, a pre-computed data store architecture, and caching as well as horizontally scalable architecture, to be able to achieve the scale that we are operating. And also within the low-latency budgets. And we are dealing with hundreds of millions of ad objects processed per second. Low-latency, tight latency budgets of 120 milliseconds or so. Next up, as you can imagine, when these ads are shown, we record our events, clicks, impressions, views, conversions, and we need to
do real-time analytics. Our use case was to do react to these ad events so that we can improve shopper experience, improve advertiser efficiency. And he wanted to do that in a near real-time fashion and move away from our daily or weekly batch mode we used to have. And we were able to use tools like S3, Glue, Kinesis, SNS, SQS, EMR, Spark, et cetera, to be able to record all these ad events in a hyperscale data lake, and then enable near real-time analytics, analytics for our business metrics and also for metrics that we show to our
advertisers, as well as enabling near real-time deep learning so that we can show most element ads to our customers. Next up, as you can imagine, deep learning machine learning is very critical to the overall ads architecture, and we have evolved it over time to improve our relevance and accuracy. Our use case here was to score thousands of ads in a request at a TPS of about 100k, 200k plus, and score all these ads, and predict how they will perform with a very tight latency budget of 20 milliseconds. And as you can imagine, the more the
relevant ads is, the more likely it is for customers to click on those ads. And it's better for shopper experience. It's also better for the flywheel that it creates. So we were able to use SageMaker GPUs and ElastiCache caching a lot of data. Again, ElastiCache comes up again, and build a combination of online and offline scoring model to achieve this level of scale and this level of prediction and scoring systems in our world. And more importantly, these systems were built in a way that they are horizontally scalable as well, so they could continue to scale
as our business grows. So I'm going to actually here, hand it off to my colleague, Anuj Joshi who's going to go through deeper in some of these areas. Thank you. (audience applauds) Thanks, Rachit. So folks, now we go deeper into the evolution of how we built this system. It was not a simple start. We wanted to establish some design principles, and then go one by one on how we grew these systems. The first things first, we focused on iterative development. Plan sufficient to plan more. Too long of planning is difficult and it's very inaccurate. You
won't be able to get it right. The next one was we focused on event-driven architecture. We found that most of our systems did not need real-time request and response. So we pre-computed more and more of data to get it cached into the systems so that the online compute was lesser and lesser. Third, we did not want to box our business into a box. We had to scale horizontally as the business grew, we did not want to overemphasize and over optimize within a host. We should be able to throw hardware at things, and that should scale
horizontally. Next, we focus on the majority of the benefits. Do not make perfect the enemy of good. And lastly, we stuck to the Amazon leadership principles. We continue to invent and simplify and deliver results as we went about. Now, let's go deeper into our campaign management system. This is the system which is there to store the advertisers intent, what they wanted to do. The other responsibility of the system was also to vend out the updates to these campaigns to other advertising systems where the peak compute data would be available. We started very simple. There was
a simple Java service. It wrote the data into RDS, a Postgres database. We pulled into the Postgres database for every 30 seconds, wrote the data into a file journal through which everybody else was reading and streaming the data out. We soon realized that for larger advertisers, Postgres was going slow. We were not able to keep up with it. We went into shard the data based on an advertiser ID, some of the data would go into one shard, based on the other advertiser ID, it would go into the other shard. The pooling got a little more
complicated, but being naive, we were loving the complexity. We were having fun with building this complicated system. Complexity continued to grow. The business continued to grow. We had more and more shards, more and more loads, the downtimes were increasing. And with every time we had to reshard, there was a manual outage. There was an operator sitting there whose heart would be bumping, pumping, "Oh, my God, what's gonna happen?" I did not like that. That's not the developer experience that I would go for my systems. This was the first overhaul of the systems. What we realized
was we were building a system which needed automatic partitioning, which needed automatic scaling, and DynamoDB seems to suit the bill. However, there were challenges. The relational data model was to be converted into an OCP data model. The next one was SQL queries for joints and where clauses could be written much easily. You can do that on DynamoDB. So we augmented a DynamoDB with Elasticsearch. To make the data congruent between these two systems, we hooked onto the DynamoDB streams, put them onto a Kafka, streamed it to the other clients as well as one of our own
clients, which would start writing to Elasticsearch. I specifically chose the word congruent, not consistent because now I have two data stores, making sure that as soon as the data is written, I can't read it from the other system. There were delays in here, and this deteriorated our performance. What was a strongly consistent system is no more a strongly consistent system. Okay, how do we get the data fast into Elasticsearch? We started writing directly from a Java application to the Elasticsearch. Okay, my P99 latencies came down from about three seconds to one second, but my P100s
where the first path would fill are still coming via DynamoDB, DynamoDB streams Kafka, and this is taking about 30 seconds. That's not good. It's still not raising the bar, had to innovate more. We continued to dive deeper, went in what is going on there? You could not get answers. And some genius told us, please upgrade you're using older version of Elasticsearch. (laughs) We upgraded 10 plus versions and our latencies came down to less than 250 milliseconds. And that was something that we loved. There was a lot of success available throughout. By the time we got
to today's world, we are processing thousands of events. Our availability is more than 99.9. We are truly horizontally scalable. The events are coming to our partners within about two to three seconds, which was originally when we started from was 30 seconds. To make the data closely available in all the regions, we also employ DynamoDB global tables here. The other fun part with this async propagation over DynamoDB global tables was that we can now employ other data processings, maybe localizations, if you want to localize the data in particular regions outside of the code main region. So
that was the success with my campaign management systems. Storing the data was one part of it, streaming it was another, and now processing it in this real-time request response system, the ad server was yet another challenge. This system would source all the data, process the ads in real-time, and finally select the set few, which need to be shown to the shopper. Historically, this system was built to be optimized and everything was put into a single host. The myth was, if you go over the network, you will incur latencies. And when we are processing a lot
of data, every millisecond counted. It was not easy to break that myth. We had about 300 gigs of search index of the whole of the campaign management on the host. We at 150 gigs of the machine learning components, machine learning models, data models, et cetera on the host. And we had about 300 gigs of caches sitting on the host, which would decorate my ad campaigns to add more data, which would help our machine learning systems eventually. Back in 2018, we were processing somewhere about 1.6 million ads per second. And for each request, we were sourcing
some at about 400 ads. For all of these ads, we had to respond... for all of these requests, sorry, we had to process within 120 milliseconds budget. I'll continue on, but this budget will not change. We will continue with the scale. This 120 milliseconds will not go anywhere else. As the tenets of improving the system, we said that we want to go distributed service-oriented architecture. What we were eyeing for that each individual component should be able to innovate independently. We should not be boxed in a box. The next step was we wanted to precompute a
lot. We did not want more and more to happen real-time. You got the data, you compute it, give it to the ad server in a pre-computed manner so that it uses it and moves on. Don't waste time there, it's precious. The third was we wanted to be horizontally scalable. If there is more data needed, we can't serve more in a single host. It will cap out at some point in time, and the business would always want more. They will never be a business which wants less. And the last was transparency. We needed transparency. Over time
what had happened was, to optimize this single machine processing a lot of data, people had skipped even logging what was going on in the system. So it was very opaque. Now we did not know what is going on in the system. Yeah, we were making money from the business, but was that what we could achieve or what was that what we could achieve? We did not know that. To start with on going distributed, we first eyed upon the major competence. Sourcing was already taking somewhere about 60 milliseconds and machine learning was already taking somewhere about
30 milliseconds. If we went over the network, probably we will add two to three milliseconds. We could chew this up for the keys, these components. The big learning, however was we went over the network and began our latencies, wow. What had happened again over time, because we were boxed in that box was the CPU had clogged up. And now that we were not doing so much on that host, the compute became faster. And we were now able to leverage that, the overall Xs and Ys played so well that the network increments were lesser than the
savings that we did on the ad server. Now, the problem left with our systems were to move these caches out. This was a challenge 'cause these were still larger components. Caches would take about two to three milliseconds to read, getting them over the network was tough. More so, we had some in-memory files also sitting out there, which would give me the data in 200 microseconds. How would I take them over the network? The journey of improving the sourcing and the machine learning systems went its own way. They used purpose specific hardware, machine learning competence will
be taught by my colleague Kun later in this conversation. Let's focus more into how we built the caching solutions at scale. Again, we started simple. There was a Java client. We made it talk to a Redis cache and thought everything should work fine. Our largest cache was reading about five keys per ad, which meant that we were reading, we were needing to read the data at about six million keys per second. What we realized was that at P99, the latencies were up to 14 milliseconds for this cache. That would not fly. The business would not
allow this. We did not have that much time. Okay, we changed the Redis to ElastiCache Memcached. That's now worked, the latencies were flat. This was also the solution that we were using on host. So it worked at least, but we did not get any data replication now. So if there is a data loss and even 1% of the data goes away, that means a burst traffic of 60,000 TPS going to the data custodian systems really surely brand it out. We can't build systems, which can take this much of burst traffic. Okay, more innovation was needed.
We said, why not create multiple replicas of the Memcached itself? So we created a campaign management service, which started writing the data into multiple replicas of the cache management systems. The Java client also became a little smarter. It would randomly pick any two of those replicas, pull the data in. And just in case if the data was not available in one of the replica, the other replica at least would serve the request and the ad server will continue to work fine. The business would continue to work fine. We achieved higher availability clearly with this, but
the data recovery was still not there. What happens to the data that was lost? We had to somehow fix that. We added more optimizations into the Java client. And to note, like these are the places where we had to do innovation, right? There was no simple way that we were able to just upgrade as we were able to do it earlier. This one was going a little more complicated. So now my Java client would basically read these two data elements. Compare whether both of them are same. If they're not, then the key was published to
a reconsideration group. This was the Eureka moment where we did not have to go to a data custodian systems. I have so many replicas, just pick the data from there and put it back into the caches. So these keys that were reinvent were consumed by my campaign management service. It would go query each of the replicas out there, establish a quorum, establish what is the right data to be available into my caches and shove it back down. In the edge case that it was not able to establish a quorum so it would go back to
our data custodian systems and provide the right data out there. But, only the keys that were being read will be back-filled by this manner. Not all the data will be back-filled by this manner. For this, what we saw practically was that caches are pretty highly available. They don't go down so often so easily. And what we focused on was rather than having all the data in the caches, focus on the hit rate that will be there in the caches so if the data is being read, that will be reconciled and that will be made available
automatically. Your business will not be impacted by this. However, to play safe, we built dynamic configurations and we separated, which are the caches that we are reading from, which are the cache replicates that we are reading from, and which are the cache replicas on which we are doing reconciliations. We still build a solution for the dooms day when say, suppose all the caches just go away. We started to back up our data on S3s and DynamoDBs, but luckily we've never had to exercise that till date. It's been about two years that this has been running.
To simplify the data ingestion into our systems, the cache manager service was also built and grown and augmented with ingestions from micro S3 files or Kinesis and SNS, SQS topics. But fun learning throughout this journey was that at this throughput, the number of objects that are being created itself cause a lot of Java garbage. And the JVM pauses itself were slowing our systems down. But again, here, at least the world came back again for my help. JDK 15 launch the GC and we got 98% of improvements for our GC pauses that again, calmed my life
down a little bit. Omit to that we broke was we focused on improving our object creation itself, garbage creation itself. And we started to pull these teeny tiny objects as well from against the standards of you should pull only large objects. At this massive throughput, at this massive scale, if you start pooling even the smaller objects, they could probably benefit you. See if JVM pauses are a pain for you. Now with the successes, some myths broken, we went over the network. We were able to go off box, even for low latency caches, we were able
to paralyze our workflows because now we were more on the network. We were able to reduce the CPU load on the host. We learned that caches are pretty stable. These could be wonderful solutions for high scale systems, but tip that you could use in case your requirements are that you want more consistency over latencies, then as you read from these multiple replicas, you could establish a quorum there and then itself, which would take a little more time, but much lesser than having to go to a data store or database. And that could also just fasten
up for your business needs. The win was that we are now, my largest of the caches is serving somewhere about 500 million keys per second. It stores about 250 gigs of data. And the network that is consumed over these caches is some where about 2.5 terabits per second. The parallelization, the replication and the sharding enabled all of this. Right from the press, there is a request coming to us on which we will be onboarding and supporting a cache for 4.5 terabits per second with 7.2 terabytes of data to be cached. And it was possible. This
is where we were more like, "You can do it. There are not problems which will not be solvable. You can do it, go on for it." Okay, let's touch back on the transparency problem that we were talking about. All of this was done. Things were over the network, but the systems were still opaque. We were not logging much. We built this ad eventing system. Real-time introspection and debugging has its bars. Once you know what is going on in the system, you will be able to innovate upon it. You will be able to analyze, you will
be able to figure out where you can optimize your systems. Also, the machine learning systems would love this. They would be able to get more and more of this data to build better scores for in our case, ad serving, conversion scores, click-through rate scores and relevance scores, which further improve our business. The challenge was that there will be hundreds of millions of ads. And we estimated that the data will be generated at about four terabytes per second. At this scale, even serialization was challenging. The CPU was clogging to just serialize so much of data, but
writing to a file was completely off the table. We took some time. We were naive, we started simple. We loved the complexities. We thought that we could dump all the data on an SNS topic, pick it up from the other side and everything will work fine. There were thousands of ads in every request, 256 kilobyte payload cap of SNS would not work here. Okay, we can change it to Kinesis. Kinesis has a payload size of one megabytes. That should be fine. Well, it was fine for some of our use cases for some of the requests
where the ads were really at thousands or 2000 ads per request, but at 300s, these were at about 20,000 ads in one request. That was again over the one megabyte limit. Also Kinesis started to throttle us because when you throw these big payloads to Kinesis, the hot shards happen and Kinesis will throttle you. Okay, we broke the payload down and each ad or each request started to break it into multiple chunks. We would shard that onto the Kinesis stream, and then later, pick it up from Kinesis data streams, attach it to an EMR cluster, and
then join the data and make it available. To get the data into S3, we attached the Kinesis data stream into Kinesis Firehose and Kinesis Firehose will put our data into S3. When we tested, we were able to log 120 terabytes per hour. This had some where about 2,500 shards, Kinesis shards running, but it was egregiously expensive. It's like millions of dollars per month. No, is it really worth it? Let's go back and look up our requirements. Even if you were to able to generate this much of data, we are just logging and somebody would be
consuming and they would need to process it. How would they even process it? There is something missing out there. We realized that we did not need 100% of the data to be logged. The other realization was that we may not need it to be logged within seconds. You know tens of seconds should be fine. This is data analytics going on or machine learning going on. This is not real-time request response. Again, the Eureka moment kicked in, we simply cut off the middleman. We were batching already in our ad servers, we started logging that batch directly
to S3. We saved 98% of our costs. Now the solution was somewhat seemingly okay for the business growth as well. The end-to-end latency is also did not go bad because again, at the scale, the buffer was filled up very fast and we were able to serve it off. The data was available in S3 in about three to five seconds. You can tune the amount of data that you want to batch, so that could vary your latencies on how you get the data into S3. And the end-to-end latencies after the whole of the data analytics and
machine learning processes was now about 10 minutes if the data was available for them to do the next jobs that they would have to do. On the other side, we also reduce the amount of data that was to be logged in. We started logging much lesser for what was relevant to be logged, and different teams came about and said, "Hey, we want to know why an ad got dropped." Somebody said, "We wanted to know what are the general scores generated?" For somebody, they said that we want to know, where is this ad sourced from? We
logged only those specific details to those teams and gave them their data out there. With this will reduce the total amount of logging to about 1.2 terabytes per hour. And that was somewhat manageable. The costs were two less. The business was benefiting by all of this. And we were able to launch a successful ad events system. Now with more and more data generated, the hunger continued to grow. Many teams also copied the solution and started to generate more and more data. We needed a centralized data lake in which we can aggregate all of this data,
keep it together and then use it for the right use cases. Let's go deeper into our data lake creation now. The first system just centralized the ton of data that we had. It would pull daily into our DSS, take a daily dump of it. It would pull into or hookup into S3 dumps that other teams were creating. Though all of the data was cataloged into Glue, and then EMR was processing all of this data. We opened up the user interfaces over Redshift and Athena, and that's how the developers would be able to query it, run
analysis on it and so on. There were wins not to deny that the system was able to run the jobs in about four to six hours. There were three terabytes of data available. It was just working fine, but there were challenges. There was always need for more data. The reaction time was to be reduced for customer behavior responses. If there is a shopper who's coming in to do something right now, and if they they've done some events, that data should come back and feed back into the machine learning systems more or less immediately, we can't
wait for four to six hours for that to happen. The developer productivity was getting impacted because over Redshift and Athena, every query used to take about 10 to 15 minutes. Now, developers don't write the query first time in perfectly. So the time was not to exactly execute that query. It was time to build that query after several attempts, and each attempt was taking those 10 to 15 minutes. So that was something that we could improve. And lastly, we had an online offline skew. What this means is that the data that was available to the ad
server in real-time streams, whereas the data that was available for these analytical systems was a daily dump, there was a difference, like there was 10 versions of the data in the online system, and there was only one version getting to our analytical systems. That tipped off a bunch of analytics, that tipped off a bunch of machine learning systems itself. Those had to be solved. With more and more teams wanting to provide their data into S3, it was just time when S3 started to throttle us. The REITs went up higher and higher and S3 capped us
at 3,500 REITs per second with more complicated jobs running, S3 captors on the REITs for 5,500 REITs per second if I remember correctly. The solution was that we randomly prefixed or prefix to the S3 buckets and started writing 16 to depart four buckets out there. Since all the data was still cataloged into Glue, there was no performance degradation for this, but again, it was time Glue started to slow down then. There was more and more data. There were larger chunks to be read through in those batch jobs. More and more S3 buckets were to be
pulled in for job processing, and Glue started to slow us down. What we realized was, the need for a searching system is data. Glue is basically supporting EMR by figuring out what are the right S3 buckets that need to be pulled up and given to EMR for the first further MapReduce jobs that would run there. We built a search solution for data. So we had an orchestrating system created, which would listen to the jobs that were fired on EMR. It will then go search in the Elasticsearch and the Glue catalog to figure out what are
the S3 files to be picked up? What are the S3 buckets to be picked up? It would go pick those specific ones. And this improved our bottleneck on Glue. It also improved our overall delays in the job processing manifold. Now there were problems in the overall four to six hours of job that was still taking in. This infests in two forms. One is from the data ingestion site itself, if the data is being generated as a daily dump, of course, at least a day will be needed to get the data and only after that, something
else can be executed. We augmented our ingestion mechanisms to get to also allow ingestion using Kinesis, Kafkas, SNS, SQS. The other part of this slowness was coming from the EMR jobs that were slowing things down. There, we augmented our EMR with Apache Spark, and that again, reduced the load times. It was able to paralyze the workflows much more efficiently, and we were able to build stuff. We realized that the delay was also because the whole system was architected as batch jobs. Now that the injection was fast, you could run a job as soon as the
data is available out there, but this job will process the data and make it ready for the next job. Then that guy will process the job, transform it, join it, do whatever, and then make it available for the next job. And then the next, and then the next, so if there are four or five iterations like this, there is still a delay. Using Kinesis and Kafka what we were able to get to was that real-time joints and analytics were possible. It is a little bit expensive, but depending on your use cases, you can figure out
which ones do you want to pay for. To improve the developer productivity part of it, we also index some of the data into Apache Druid and Apache Superset, which was nearly able to get real time responses to the developers. Once the queries were fastened up, and they wanted to query the whole set of the data, they could now fire that same query on Redshift and Athena and get their results faster. With these successes coming here, we now had petabytes of data available. We could offer about 15 minutes of overall end-to-end data ingestion latencies. The real-time
joints and analytics was possible with Kinesis and Kafka. Of course, they were a little bit costly repeating that. To reduce our costs, we employed S3 tiered storage. So the data that was not being used or the data that got old, those were put into colder S3 solutions, which were much cheaper. Another optimization was that we used our EMR clusters. EMR charges you even at the boot time, and by reusing them, we reduce the boot times multiple times spinning up the EMR cluster. And that also improved our costs tremendously. All this data proved gems and jewels
to our machine learning systems, our analytics improved. And it was fun. Just to reiterate before I hand over to Kun, some of the salient points that we learned through our journey, focus on not letting your system be boxed in a box, continue to scale horizontally. The world is there to solve some problems for us. We don't need to solve our problems. Pick up the latest solutions, keep yourself abreast with the latest technology is going out there. Don't be naive to love the complexity. It's a common trait that engineers would want to solve big problems, but
try to solve problems, which actually get your business benefits, your solutions out there. And lastly, having a good system is much better than not having a perfect system. Thank you so much. (audience applauds) Thank you. Hey, everyone. Thank you, Anuj for covering our ad serving architecture. So my name is Kun. I'm a Director of Sponsored Product Ads in Amazon. In the remaining session, let's talk about machine learning. So at Amazon, our vision is to build the most customer-centric ads program on earth. And in order to do that that we deliver machine learnings throughout the entire
life cycle of ads so we can understand better the queries and products. We can predict how likely our shopper is going to click and purchase our products. And so we can further deliver a more relevant and engaging ad experience that match advertiser to shoppers intent, right? So here, it gives us a very high level overview about our ads flow, as you can see. So when a shopper like any of you come to Amazon, start searching for something, behind this thing, many things happening. First, we use information retrieval and deep learning in batting techniques to identify
the potential ads where as we can show to you with respect to your queries. And this may come up with few thousand of ads candidate. And then after that, we use deep learning inferencing to ask me how likely as a user, you're going to click on each of this ads candidate and how likely you're gonna purchase. And then in addition to that, that we also try to understand an estimate, what's the relevance between this ad candidate with respect for your query? And then finally, we put all this information, our estimation into the next stage, which
we call allocation pricing stage, which is nothing but a multi objective optimization. Or in other words, auction, we try to optimize the auction efficiencies subject to the constraint that we want to make sure that ads are relevant, are engaging and can optimize the downstream longterm benefit for both shoppers, advertisers in Amazon. And we also track shopper's engagement with all these ads, such as impressions, clicks, purchases, and all of this information are being logged and fed back to our model training stage as a next iteration so we can continue to improve our relevance, prediction and optimization
models to continue deliver a more relevant, engaging experience. For the remaining section of the talk, I'm gonna cover about the machine learning infrastructure. How can we build ultra-high latency and ultra-low latency and high throughput machine learning infrastructure to empower these science innovation? The biggest challenge we have to deal with is a scale because every second we have to deal with a few hundred of thousand ad request. And for each of such request, we have the score thousand of ads using 20 milliseconds at the P99. And in the meantime, we also have to host a few
hundreds of models for different marketplaces, for different product categories, different traffic segments, which may require very different hardware and software requirement. And in the meantime, we have to keep our hardware costs under control, right? To tackle all this challenges, we start building a deep learning inferencing service on CPUs at the beginning. And we start with the OpenSource BLAS, which is the basic linear algebra subprograms optimization libraries. We do our in-house optimization to meet the latency requirements. And this solution initially works very well. It handles multi-layer perceptron network with three to five layers of complexity and
a few hundred million, or maybe less than 100 million parameters we can deal with, right? And also it covers 100% traffic in real-time because for every request now we can handle all the thousand of ads of deep learning inferencing was 20 minutes seconds, which works very well. However, it cannot scale to even more complex deep learning models with attention mechanisms or transformers for us to better understand the clear rate, the product and the shopper's intent. This was our initial solution. Then to deal with more complex models, we start building something more, that's asynchronous inferencing. So
the idea is to pre-compute. So we pre-computed the scores from query to add products, and put them into a lookup table. Unlike cache for real-time lookup. When there's a cache misscs, we fall back to a much simpler model for real-time inferencing, but in the meantime, we kick off asynchronous computation behind the scene with more complex different models. So we can operate the cache for future lookup. And this solution, as you can see, it can support very complex deep learning models because it doesn't require real-time inferencing. However, there's still a challenge about the cardinality of the
keys. Consider on Amazon, we have billions of queries and hundreds of meetings of eight things, right? If you combine them all together as your key, it's very obvious that the solution is very effective for head and torsy traffic, but not necessarily for long-tail traffic. To get the benefit of both solutions, we build a hybrid serving infrastructure. At the core, you can see on the diagram is the latency budget estimator. So the idea is that everything is built on CPUs as usual, but this latency budget estimator will estimate in real time, how much budget we have
left for inferencing. If we have enough budget, we'll go to the real-time inferencing with the ones deep learning models that we can handle. If we do not have enough budget, we'll go to asynchronous inferencing to a real-time lookup, right? And this latency budget solution will serve as the central point for us to decide which path, which infrastructure even model we want to go for. And this solution actually give us the benefit of both real-time asynchronous altogether. However, as we enter into 2022, for us to further model the higher order interaction between both shoppers, advertisers, products,
shopper's intent, interest, contacts such as day of the time, hour of the day, day of the week, position of the ads, any other contact information, our model becomes even more complex, which can be 10X more complex than before from a hundred, few hundred million of parameters to billions of parameters, which require 15 ads even more falling upon operations. And as a result, the latency becomes prohibitively high. We cannot inference the model using our traditional existing solution on CPU's anymore because the latency can easily go up from 20 milliseconds to one second for single deep learning
inferencing, which is not working for us at all, right? And also asynchronous computation is not working either because now we have to consider many more dimensions of the models from shoppers to advertisers, to products, to the context like the position, the devices, all this information if it come put them all together, this cardinality of the key is explosive. We cannot do it all the pre-compute anymore at a very granular level either, right? So what's a solution? The GPU's, where it start building the GPU inferencing on Amazon ECS. So the solution is we start integrating with
NVIDIA Triton server with TVM deep learning compiler. And on top of that, we apply additional optimizations to further improve our latency and throughput. So for example, we introduced additional requests and the get faster responses so we can optimize a tail latency. We also compress our feature payload to reduce network communication latency. To improve the throughput, we tune our model parallelism as much as possible so we can invoke many model instances on the NVIDA Triton. And in the meantime, we've tried to reduce the CUDA API cost that we copy data from CPUs to GPUs, and the
cost that invoke the carnal, because the more you have to cost, you have to copy data, and the throughput is facing a challenge. So by doing all this in-house optimization on top of NVIDA Triton, we're able to serve very complex models. So that truly understands shopper's intent and interest. Think about this. If you are searching for a shampoo, with us specifying any additional requirement, the model on Amazon today can understand whether you're looking for a specific brand or specific color or specific ingredient or flavor. So that's what the model was more complex and the parameters
can help us to deliver. So this works with field models. That was more complex parameters and structure, but we want to scale even more. We want to deliver this kind of model for different marketplaces, for different product categories of pair of shoes, electronics, books, toys, right? How can we scale to many more such kind of models? Our traditional monolithic service as Anuj mentioned is not working anymore because each model will require very different hardware and software optimization strategies in order for us to serve in real-time, right? So the solution for us is to start building
a microservice inferencing architecture. So the idea is very simple. It partition model into shards, and we create microservices for each shard. And we also build a data plan that consists all of these different microservices. And now given the model is being partitioned and a service with different microservices, we can try different configurations of hardware and software for different models. We have further build on this microservice on top of Amazon App Mesh. With this App Mesh, we can automatically handle the behind the scene, the lower level inter service communications, including low balancing, routing and circuit breaking.
Now each service can have very different hardware and the software configurations. If you look at the right diagram on the upper right corner, we have a deep neural network, which has been served on the NVIDA Triton server on G4 machines, but on the lower right corner, we have another BERT model that can be used the way the SageMaker multi model server on inference chips. So that's the power of this microservicing architecture that allow us to try very different strategies and the hardware in terms of hardware and the software. Okay, so in addition to this data
plan, we also build a control plan. So the idea is to use this control plan, to enable this configuration model, service and fleet management. If you look at the upper right corner, now with this control plan, the model owner can just write some configurations, as you can see in the table upright corner about the model areas, the instance type and the wrong time, and the software hardware requirement, the control plan will operate as a control loop to monitor constantly this configuration and the configuration from the data plan to ensure they are matching. So whenever you
add a new configuration, the system will ensure a new microservice will be up and running within just a few minutes without any code deployment requirements, right? This control plan also reacts to new model artifacts. If you look at the table on the right-hand corner, right? Lower right corner, whenever the offline machine learning pipeline has trained a new model and part of the model, the configuration will change. The control plan will monitor and listen to this change and deploy a new model artifacts into the system. So we can pick up the new models instantly. Okay, so
what we have learned so far by building all this machine learning infrastructure, first of all, one solution does not fit all, right? So we talked about real time inferencing. We talked about asynchronous inferencing. So a dynamic and latency budget estimator probably is a good solution for you to decide which solution you want to use in real time. Whether you want to go with real time, with one architecture, or with asynchronous computation with another architecture. This will help us to make decisions in the more intelligent fashion. Secondly, you want to really build an open architecture, not
the moralistic, but open architecture so you can integrate with more advanced customize hardware and software in the near future. So you do not have to wait until the redesign of the entire system to try all those new solutions. Secondly, the inferencing optimization is a full-stack work, which covers the software, the third party libraries down to the hardware. You may want to choose different model servers such as NVIDA Triton and the SageMaker multi model server for different throughput or latency requirement for your jobs. You may also want to choose different deep learning optimization libraries from the
TensorRT, Apache TVM for different workloads. And finally, you may also have to choose different hardware, such as R5, P3, or even G4 for different compute heavy or throughput heavy different jobs. By putting all this together, as of today, we are able to operating at Amazon with very complex deep learning models with billions parameters that we can truly understand shoppers intent, the products, characteristics so we can deliver a more relevant and engaging ad experience. At least if you don't feel something is right with the ads you're looking at Amazon, you have one of the POCs like
me to blame about. Okay, so this completes everything we want to cover about Amazon ads in today's talk. If you're interested in working on any of this very complex, interesting and also exciting problems, please know that we're hiring across all the job families, scientists, engineers, managers, product managers, feel free to reach out to any of the speakers like us offline. Thank you very much for your attention. (audience applauds)