Актер Scala: receiveWithin() не получает сообщения

Я создаю службу на основе акторов в Scala, где потребители могут запрашивать, авторизованы ли клиенты, а также могут авторизовать клиентов.

Если потребитель запрашивает состояние авторизации клиента, а этот клиент еще не авторизован, субъект должен дождаться входящих Authorize сообщений в течение заданного времени ожидания, а затем отправить ответ. IsAuthorized должна иметь возможность выполняться синхронно в коде потребителей, чтобы он блокировался и ждал ответа. Что-то типа

service !? IsAuthorized(client) => {
  case IsAuthorizedResponse(_, authorized) => // do something
}

Однако receiveWithin() в моем актере никогда не получает сообщения и всегда упирается в тайм-аут.

Вот мой код

case object WaitingForAuthorization
case class WaitingForAuthorizationResponse(clients: immutable.Set[Client])
case class IsAuthorized(client: Client)
case class IsAuthorizedResponse(client: Client, authorized: Boolean)
case class Authorize(client: Client)

class ClientAuthorizationService {
  private val authorized: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
  private val waiting: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]

  def actor = Actor.actor {
    loop {
      react {
        case IsAuthorized(client: Client) => reply {
          if (authorized contains client) {
            IsAuthorizedResponse(client, true)
          } else {
            waiting += client
            var matched = false;
            val end = Instant.now.plus(ClientAuthorizationService.AUTH_TIMEOUT)

            while (!matched && Instant.now.isBefore(end)) {
              // ERROR HERE: Never receives Authorize messages
              receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
                case Authorize(authorizedClient: Client) => {
                  authorizeClient(authorizedClient)
                  if (authorizedClient == client) matched = true
                }
                case TIMEOUT => // do nothing since we handle the timeout in the while loop
              }
            }

            IsAuthorizedResponse(client, matched)
          }
        }

        case Authorize(client: Client) => authorizeClient(client)
        case WaitingForAuthorization => reply {
          WaitingForAuthorizationResponse(immutable.Set() ++ waiting)
        }
      }
    }
  }

  private def authorizeClient(client: Client) = synchronized {
    authorized += client
    waiting -= client
  }
}

object ClientAuthorizationService {
  val AUTH_TIMEOUT: Long = 60 * 1000;
}

Когда я отправляю сообщение Authorize актеру, когда он находится в блоке receiveWithin, сообщения перехватываются вторым оператором case ниже, который фактически должен перехватывать эти сообщения только тогда, когда никто не ждет ответа в это время.

Что не так с моим кодом?

Обновление:

Вот сокращенная версия соответствующего кода, который на самом деле представляет собой гораздо более простую и другую логику, но, возможно, лучше проясняет проблему:

loop {
  react {
    case IsAuthorized(client: Client) => reply {
      var matched = false

      // In the "real" logic we would actually loop here until either the
      // authorized client matches the requested client or the timeout is hit.
      // For the sake of the demo we only take the first Authorize message.

      receiveWithin(60*1000) {
        // Although Authorize is send to actor it's never caught here
        case Authorize(authorizedClient: Client) => matched = authorizedClient == client
        case TIMEOUT => 
      }

      IsAuthorizedResponse(client, matched)
    }

    case Authorize(client: Client) => // this case is hit
  }
}

Обновление 2:

Я наконец решил проблему. Я думаю, проблема заключалась в том, что актор блокировался при попытке получить сообщение Authorize в ответе на предыдущее сообщение IsAuthorized.

Я переписал код так, чтобы анонимный Актер запускался, когда мы ждем Authorized. Вот код для тех, кому интересно. waiting это Map[Client, Actor].

loop {
  react {
    case IsAuthorized(client: Client) =>
      if (authorized contains client) {
        sender ! IsAuthorizedResponse(client, true)
      } else {
        val receipient = sender
        // Start an anonymous actor that waits for an Authorize message
        // within a given timeout and sends a reply to the consumer.
        // The actor will be notified by the parent actor below.
        waiting += client -> Actor.actor {
          val cleanup = () => {
            waiting -= client
            exit()
          }

          receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
            case Authorize(c) =>
              receipient ! IsAuthorizedResponse(client, true)
              cleanup()
            case TIMEOUT =>
              receipient ! IsAuthorizedResponse(client, false)
              cleanup()
          }
        }
      }

    case Authorize(client: Client) =>
      authorized += client

      waiting.get(client) match {
        case Some(actor) => actor ! Authorize(client)
        case None =>
      }

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
  }
}

Если есть лучшие способы решить эту проблему, пожалуйста, дайте мне знать!


person Sven Jacobs    schedule 11.10.2011    source источник
comment
Можете ли вы очистить/сократить код и опубликовать только соответствующие части?   -  person Jus12    schedule 11.10.2011


Ответы (2)


Проблема не в ответе? В

case IsAuthorized(client: Client) => reply { ... }

весь код находится в аргументе блока ответа, поэтому он выполняется (включая receiveWithing) до фактической отправки ответа. Это означает, что когда ваш клиент обработает ваш ответ, вы больше не будете его ждать.

В вашем исходном коде это, вероятно, должно быть что-то вроде

case IsAuthorized(client: Client) =>
  if(ok) reply(AuthorizedReply(client, true))
  else {
     reply(AuthorizedReply(client, false))
     receiveWithin(...)
  }
person Didier Dupont    schedule 11.10.2011
comment
поэтому он выполняется (включая receiveWithing) до фактической отправки ответа. Это именно то, что я хочу :-) Ответ должен блокироваться до тех пор, пока клиент не будет авторизован или не истечет время ожидания. Этот код вдохновлен этим примером здесь (см. третий фрагмент кода главы Второй пример). Ваш код также неверен, потому что, если клиент еще не авторизован, вы немедленно отправите ответ с ложью, а затем будете ждать входящих сообщений авторизации. - person Sven Jacobs; 11.10.2011

Я наконец решил проблему. Я думаю, проблема заключалась в том, что актор блокировался при попытке получить сообщение Authorize в ответе на предыдущее сообщение IsAuthorized.

Я переписал код так, чтобы анонимный Актер запускался, когда мы ждем Authorized. Вот код для тех, кому интересно. waiting это Map[Client, Actor].

loop {
  react {
    case IsAuthorized(client: Client) =>
      if (authorized contains client) {
        sender ! IsAuthorizedResponse(client, true)
      } else {
        val receipient = sender
        // Start an anonymous actor that waits for an Authorize message
        // within a given timeout and sends a reply to the consumer.
        // The actor will be notified by the parent actor below.
        waiting += client -> Actor.actor {
          val cleanup = () => {
            waiting -= client
            exit()
          }

          receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
            case Authorize(c) =>
              receipient ! IsAuthorizedResponse(client, true)
              cleanup()
            case TIMEOUT =>
              receipient ! IsAuthorizedResponse(client, false)
              cleanup()
          }
        }
      }

    case Authorize(client: Client) =>
      authorized += client

      waiting.get(client) match {
        case Some(actor) => actor ! Authorize(client)
        case None =>
      }

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
  }
}

Если есть лучшие способы решить эту проблему, пожалуйста, дайте мне знать!

person Sven Jacobs    schedule 11.10.2011