Chaining Operations in Swift with FlatMap

In situations where you need to complete a series of operations in a synchronous manner, Apple's reactive framework Combine makes it possible to chain one request after another by using the flatMap method on any publisher.

In regard to Combine, flatMap makes it so you can take the result of one operation, assume it's successful, and use that result to launch a subsequent request. It sounds easy enough, but actually becomes quite complicated when working with real-world situations.

To help demonstrate a situation I ran into for my app, we will be checking if a user is currently signed in, then attempt to get that user's authentication information, check if there is an existing user in the database, and create one if the user doesn't exist yet. The examples used here are going to use AWS Amplify, but the concepts can be applied to any operation APIs that support Combine whether they are regular networking requests or use a third-party SDK.

Let's start with a basic API that simply returns whether a user is signed in or not and handle's logic in the sink.

// 1
Amplify.Auth.fetchAuthSession()
    .resultPublisher
    // 2
    .map(\.isSignedIn)
    // 3
    .sink { completion in
        switch completion {
        case .failure(let error):
            print("All errors are handled here: \(error)")
            
        case .finished:
            print("Publisher stopped. No more values")
        }
    // 4
    } receiveValue: { isSignedIn in
        print("User signed in: \(isSignedIn)")
    }
    // 5
    .store(in: &tokens)

Let's break down everything that's going on in the snippet above to make sure we're on the same page so far:

  1. The fetchAuthSession call checks for existing sessions to see if the user has already signed into the app and is still signed in.
  2. map is then called to take the success result of fetchAuthSession and only pass the value from the property isSignedIn down the stream, meaning we are only working with a Bool at this point and not a result object.
  3. The first block of a sink is where the completion of the publisher can be observed. Any errors that happen in the logic before the sink will be passed here.
  4. The second block of a sink is where you can write logic for the end result value because this code will only be ran if an error wasn't thrown prior to the sink.
  5. Publisher tend to be asynchronous which means the logic in the sink is ran at a different time than when the function is called. To ensure that our sink logic is called when it's supposed to be, we keep the sink in memory by storing the token (AnyCancellable) in a property. For this case, it's being stored in tokens: Set<AnyCancellable>.

We can modify the snippet above by simply replacing the .map(\.isSignedIn) with the following code snippet:

.flatMap { session in
    Just(session.isSignedIn)
}

This does essentially the same exact thing as map. Generally you should use map over flatMap in a situation like this since flatMap requires you to return a Publisher, which is why we wrap the Bool value in Just. Just is a Publisher that simply streams the value used to intialize it and was probably created specifically to enable flatMaps to work.

Before continuing, we will need to create a custom Error type that will allow us to consildate all possible errors thrown by the flatMap to be of the same type. Create the following enum:

enum MyAuthError: Error {
    case amplify(AmplifyError)
    case noSession
}

Notice that one case accepts an AmplifyError object. This will make it so any error thrown by Amplify can be converted to this case since all Amplify errors conform to the AmplifyError protocol. We will be using this error type shortly.

The next step in our series of operations is to determine whether the user is signed in, and if they are, pass along the AuthUser provided by the Amplify library. Update the flatMap code to look like the following snippet:

.flatMap { session in
    // 1
    if session.isSignedIn, let authUser = Amplify.Auth.getCurrentUser() {
        return Just(authUser)
            // 2
            .setFailureType(to: MyAuthError.self)
            // 3
            .eraseToAnyPublisher()
    } else {
        // 4
        return Fail(error: MyAuthError.noSession)
            // 5
            .eraseToAnyPublisher()
    }
}
  1. The important part about this line is that we're writing some logic that will essentially return two different types of results. In this code, if the session is signed in and we're able to get the current AuthUser object then we can return that type below.
  2. Again, Just is for creating a simple Publisher and passing that value down stream. Because of it's simplicity, it doesn't have an error (or failure) type and that must be set manually to keep the return types of the .flatMap consistent.
  3. Any modifiers added to Publishers wrap the initial Publisher in the new type of Publisher making it nearly impossible to have matching publishers. eraseToAnyPublisher takes the success type and failure type and make the Publisher simple again. This one results in the type AnyPublisher<AuthUser, MyAuthError>
  4. Fail is very similar to Just, but it is used to throw errors as opposed to pass values. Throwing a fail will skip all subsequent logic following the current flatMap block and ends the stream with an error being provided in the sink.
  5. Again, we need to return a consistent type and eraseToAnyPublisher makes that possible.

Now if you're trying this out in Xcode, you will receive an error:

No exact matches in call to instance method 'flatMap'

This is such a hard error to work with because it doesn't provide any context on what's wrong and no suggestions are provided. Essentially, Xcode can't infer what type is supposed to be returned and that's causing this error to pop up right now.

Update the first line of .flatMap to look like the following:

.flatMap { session -> AnyPublisher<AuthUser, MyAuthError> in

When working with flatMap it will make your life much easier if you get used to explicitly writing out the return type of the flatMap block.

At this moment, there is still a similar error to before, but the problem is slightly different this time. The flatMap block is returning a different error than the initial Publisher of fetchAuthSession which has an error type of AmplifyError. To fix this problem, add the following line directly above the .flatMap line:

.mapError(MyAuthError.amplify)

This line solves the problem because we have mapped the error from fetchAuthSession to an error type that matches the flatMap failure type, MyAuthError.

Now that we have been able to successfully extract the AuthUser which contains information like user ID and username, we can use that information to make a query to check if there is a User object in our database that matches the AuthUser. Based on that query's results, we will either use the existing User or create and save a new one.

Add another flatMap just below the first flatMap:

.flatMap { authUser -> AnyPublisher<User, MyAuthError> in
    Amplify.DataStore.observeQuery(for: User.self, where: User.keys.id == authUser.userId)
        .mapError(MyAuthError.amplify)
        .first()
        .map(\.items)
}

Adding the snippet above to your code will result in an incompatibility error since the return type of the closure (AnyPublisher<User, MyAuthError>) and the return type of observeQuery (AnyPublisher<[User], MyAuthError>) are different. Since observeQuery alone doesn't give us enough information to determine whether a User object exists or needs to be created, we can add a nested flatMap after the .map(\.items). Add the following code:

// 1
.flatMap { users -> AnyPublisher<User, MyAuthError> in
   // 2
   if users.isEmpty {
       let newUser = User(id: authUser.userId, username: authUser.username)
       return Amplify.DataStore.save(newUser)
           .mapError(MyAuthError.amplify)
           .eraseToAnyPublisher()
   // 3
   } else {
       return Just(users.first!)
           .setFailureType(to: MyAuthError.self)
           .eraseToAnyPublisher()
   }
}
.eraseToAnyPublisher()
  1. The nested flatMap has the same return type as the parent flatMap, keeping everything consistent.
  2. If the users array is empty, a new object will be saved to the database. The .save method returns a single Model and the error is mapped to MyAuthError, keeping the return type consistent.
  3. If the users array is not empty, just return the first user and set the failure type to match the flatMap return type.

An that's it! You have successfully chained multiple asynchronous operations in a synchronous manner using Combine. Your final result will look something like this:

Amplify.Auth.fetchAuthSession()
   .resultPublisher
   .mapError(MyAuthError.amplify)
   .flatMap { session -> AnyPublisher<AuthUser, MyAuthError> in
       if session.isSignedIn, let authUser = Amplify.Auth.getCurrentUser() {
           return Just(authUser)
               .setFailureType(to: MyAuthError.self)
               .eraseToAnyPublisher()
       } else {
           return Fail(error: MyAuthError.noSession)
               .eraseToAnyPublisher()
       }
   }
   .flatMap { authUser -> AnyPublisher<User, MyAuthError> in
       Amplify.DataStore.observeQuery(for: User.self, where: User.keys.id == authUser.userId)
           .mapError(MyAuthError.amplify)
           .first()
           .map(\.items)
           .flatMap { users -> AnyPublisher<User, MyAuthError> in
               if users.isEmpty {
                   let newUser = User(id: authUser.userId, username: authUser.username)
                   return Amplify.DataStore.save(newUser)
                       .mapError(MyAuthError.amplify)
                       .eraseToAnyPublisher()
                   
               } else {
                   return Just(users.first!)
                       .setFailureType(to: MyAuthError.self)
                       .eraseToAnyPublisher()
               }
           }
           .eraseToAnyPublisher()
   }
   .sink { completion in
       switch completion {
       case .failure(let error):
           print("All errors are handled here: \(error)")
           
       case .finished:
           print("Publisher stopped. No more values")
       }
   } receiveValue: { user in
       print("Do something with User from database \(user)")
   }
   .store(in: &tokens)