examples 用Akka typed actor实现断点下载(暂停、重试)

分享 未结
1 1 2 264
梦境迷离 2020-05-12发布
收藏 点赞

第一次用typed,比较丑陋。

1.启动

package io.github.dreamylost.impl.actor

import java.io.RandomAccessFile
import java.net.URL

import akka.actor.typed.{ ActorSystem, Behavior }
import akka.actor.typed.scaladsl.Behaviors
import com.typesafe.scalalogging.LazyLogging
import io.github.dreamylost.Constants
import io.github.dreamylost.Constants._
import io.github.dreamylost.FileDownload.getDownloadFileName
import io.github.dreamylost.FileUtils.usingIgnore
import io.github.dreamylost.impl.actor.FileDownloadActor.{ DownloadTask, FileTask, ShutdownSystem, StartDownloadTask }

/**
 * 开启下载任务
 *
 * @author liguobin@growingio.com
 * @version 1.0,2020/5/12
 */
object FileDownloadActorMain extends LazyLogging with App {

  //删除临时文件
  clearTempFiles()
  //下载一个
  download(Constants.DEFAULT_FILE_URLS.head)

  //没有lazy会先获得0值
  //https://dreamylost.cn/scala/Scala-%E5%8F%98%E9%87%8F%E5%88%9D%E5%A7%8B%E5%8C%96%E9%A1%BA%E5%BA%8F.html
  private[this] lazy val threadCount: Long = Constants.threadCount
  private[this] var blockSize: Long = _

  lazy val system: ActorSystem[FileTask] = ActorSystem(FileDownloadActorMain(), "file-download-system-actor")

  //主actor
  def apply(): Behavior[FileTask] =
    Behaviors.setup { context =>
      Behaviors.receiveMessage {
        case ShutdownSystem(msg) =>
          logger.error(msg.getOrElse("None"))
          system.terminate()
          Behaviors.stopped
        case StartDownloadTask(actorId, start, totalSize, startPos, endPos, url) =>
          val actor = context.spawn(FileDownloadActor(), s"download-file-actor-$actorId")
          val replyTo = context.spawn(FileDownloadAuditActor(), s"download-audit-actor-$actorId")
          actor ! DownloadTask(actorId, start, totalSize, startPos, endPos, url, replyTo)
          Behaviors.same

      }

    }

  def download(fileUrl: String): Unit = {
    val url = new URL(fileUrl)
    val connection = url.openConnection().asHttpUrlConnection()
    val fileName = getDownloadFileName(fileUrl)
    if (connection.getResponseCode == 200) {
      val fileLength = connection.getContentLength
      usingIgnore(new RandomAccessFile(fileName, "rwd")) {
        randomAccessFile => randomAccessFile.setLength(fileLength)
      }
      blockSize = fileLength / threadCount
      logger.info(s"file name: $fileName, file length: $fileLength, each block: ${blockSize / 1024 / 1024}MB")
      for (actorId <- 1L to threadCount) {
        val startPos: Long = (actorId - 1) * blockSize
        var endPos: Long = (actorId * blockSize) - 1
        if (threadCount == actorId) {
          endPos = fileLength
        }
        system ! StartDownloadTask(actorId, System.currentTimeMillis(), fileLength, startPos, endPos, fileUrl)
      }
    } else {
      logger.error(s"network error or file not found: ${connection.getResponseCode}")
    }
    connection.closeConnect()
  }

}


2.下载

package io.github.dreamylost.impl.actor

import java.io.{ File, FileWriter, RandomAccessFile }
import java.lang
import java.net.URL

import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.Behaviors
import com.typesafe.scalalogging.LazyLogging
import io.github.dreamylost.Constants.{ savePath, _ }
import io.github.dreamylost.FileDownload._
import io.github.dreamylost.FileUtils.{ reader, usingIgnore }

/**
 * 文件下载
 *
 * @author liguobin@growingio.com
 * @version 1.0,2020/5/12
 */
object FileDownloadActor extends LazyLogging {


  sealed trait FileTask

  final case class DownloadTask(actorId: Long, taskBeginTime: Long, fileTotalLength: Long, startPos: Long, endPos: Long,
    url: String, replyTo: ActorRef[Result]) extends FileTask

  final case class StartDownloadTask(actorId: Long, taskBeginTime: Long, fileTotalLength: Long, startPos: Long,
    endPos: Long, url: String) extends FileTask

  final case class ShutdownSystem(msg: Option[String]) extends FileTask

  sealed trait Result

  final case class DownloadResult(actorId: Long, perBlockLength: Long, startTime: Long,
    endTime: Long, msg: Option[String] = None, from: ActorRef[FileTask]) extends Result

  final case class DownloadErrorResult(actorId: Long, taskBeginTime: Long, fileTotalLength: Long, startPos: Long, endPos: Long,
    url: String, error: Option[String] = None, replyTo: ActorRef[FileTask]) extends Result

  final case class DownloadDoneResult(taskBeginTime: Long, fileTotalLength: Long, from: ActorRef[FileTask]) extends Result

  final case class DownloadDoingResult(actorId: Long, speedOfProgress: String, from: ActorRef[FileTask]) extends Result

  //下载actor
  def apply(): Behavior[FileTask] = Behaviors.receive { (context, message) =>
    message match {
      case task@DownloadTask(actorId, taskBeginTime, fileTotalLength, startPos, endPos, url, replyTo) =>
        logger.info(s"receive download task: \n[$task]")
        val currentBlockSize: Long = endPos - startPos
        var realStart = startPos
        var total = 0L
        var totalFileLen = 0L

        val createFileUpdateTempPos = (tempFilePath: String, isPerRequest: Boolean) => {
          val file = new File(tempFilePath)
          if (file.exists()) {
            logger.info(s"open temp file: $tempFilePath")
            val saveStartPos = reader(file)
            val savePos = saveStartPos.trim
            if (isPerRequest) {
              if (saveStartPos != null && saveStartPos.length() > 0) {
                realStart = java.lang.Long.parseLong(savePos)
                total = realStart - startPos
              }
            } else {
              if (saveStartPos != null && saveStartPos.length() > 0) {
                totalFileLen = java.lang.Long.parseLong(savePos)
              }
            }
          }
        }

        val requestFilePartAndUpdateStartPos = () => {
          val connection = new URL(url).openConnection().asHttpUrlConnection()
          connection.setRequestMethod("GET")
          connection.setReadTimeout(durationToMillis())
          val tempFileSavePos = savePath + "/" + actorId + tmpSuffix
          createFileUpdateTempPos(tempFileSavePos, true)
          connection.setRequestProperty("Range", "bytes=" + realStart + "-" + endPos)
          (connection, tempFileSavePos)
        }

        val startTime: Long = System.currentTimeMillis()
        val downloadFileName = getDownloadFileName(url)
        val (connection, tempFile) = requestFilePartAndUpdateStartPos()
        try {
          if (connection.getResponseCode == 206) {
            usingIgnore(new RandomAccessFile(downloadFileName, "rwd")) { randomAccessFile =>
              randomAccessFile.seek(startPos)
              usingIgnore(connection.getInputStream) { is =>
                val buffer = new Array[Byte](calculationAndGetBufferSize())
                var len = -1
                var newStartPos = startPos
                while (total < currentBlockSize && (len = is.read(buffer)) != -1) {
                  total += len
                  randomAccessFile.write(buffer, 0, len)
                  val processPercent = new lang.Double(total / currentBlockSize.asInstanceOf[Double] * 100).formatted("%.2f") + "%"
                  replyTo ! DownloadDoingResult(actorId, processPercent, context.self)
                  newStartPos += len
                  val fileWriter = new FileWriter(tempFile, false)
                  fileWriter.write(String.valueOf(newStartPos))
                  fileWriter.flush()
                  fileWriter.close()
                }
                val totalTempPath = savePath + "/" + "totalLength" + tmpSuffix
                createFileUpdateTempPos(totalTempPath, false)
                totalFileLen += total
                logger.info(s"totalFileLen: $totalFileLen")
                val fileWriter1 = new FileWriter(totalTempPath, false)
                fileWriter1.write(String.valueOf(totalFileLen))
                fileWriter1.flush()
                fileWriter1.close()
                connection.closeConnect()
                if (totalFileLen == fileTotalLength) {
                  replyTo ! DownloadDoneResult(taskBeginTime, fileTotalLength, context.self)
                } else {
                  replyTo ! DownloadResult(actorId, currentBlockSize, startTime, System.currentTimeMillis(), msg = Some(s"actorId $actorId finished"), context.self)
                }
              }
            }
          }
          else {
            replyTo ! DownloadErrorResult(actorId, taskBeginTime, fileTotalLength, startPos, endPos, url, error = Some("response code is not 206"), context.self)
          }
        } catch {
          case exception: Exception => {
            logger.warn(exception.getLocalizedMessage)
            replyTo ! DownloadErrorResult(actorId, taskBeginTime, fileTotalLength, startPos, endPos, url, error = Some(exception.getLocalizedMessage), context.self)
          }
        }
        Behaviors.same
    }
  }

}


3.统计

package io.github.dreamylost.impl.actor

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import com.typesafe.scalalogging.LazyLogging
import io.github.dreamylost.Constants._
import io.github.dreamylost.impl.actor.FileDownloadActor.{ DownloadTask, Result, ShutdownSystem }

/**
 * 接受下载反馈,记录下载速度,处理下载失败
 *
 * @author liguobin@growingio.com
 * @version 1.0,2020/5/12
 */
object FileDownloadAuditActor extends LazyLogging {

  private[this] lazy val retryMap = new java.util.HashMap[Long, Int]

  private[this] def printSpeed(fileSize: Long, start: Long, end: Long, prefix: String = "", suffix: String = ""): Unit = {
    try {
      val speed = fileSize.asInstanceOf[Double] / (end - start).asInstanceOf[Double]
      val s = speed * 1000 / 1024
      logger.info(s"$prefix: ${s.formatted("%.2f")} kb/s, cost time: ${end - start}ms")
      logger.info(suffix)
    } catch {
      case exception: Exception =>
        logger.error(exception.getLocalizedMessage)
    }
  }

  //计算速度和处理结果actor
  def apply(): Behavior[Result] = {
    Behaviors.receive { (context, message) =>
      message match {
        case FileDownloadActor.DownloadDoingResult(actorId, processPercent, _) =>
          logger.info(s"actorId $actorId - $processPercent complete now")
        case FileDownloadActor.DownloadResult(actorId, blockSize, startTime, endTime, msg, _) =>
          printSpeed(blockSize, startTime, endTime, prefix = s"actorId $actorId - speed", suffix = msg.getOrElse(s"actorId $actorId finished"))
          //简单重试
        case FileDownloadActor.DownloadErrorResult(actorId, taskBeginTime, fileTotalLength, startPos, endPos, url, error, replyTo) =>
          if (retryMap.containsKey(actorId)) {
            var count = retryMap.get(actorId)
            if (count < retryTimes) {
              logger.warn(s"error, retry $count times")
              replyTo ! DownloadTask(actorId, taskBeginTime, fileTotalLength, startPos, endPos, url, context.self)
              count += 1
              retryMap.put(actorId, count)
            } else {
              FileDownloadActorMain.system ! ShutdownSystem(Some(s"error, retry more than $retryTimes times: $error"))
            }
          } else {
            retryMap.put(actorId, 1)
            replyTo ! DownloadTask(actorId, taskBeginTime, fileTotalLength, startPos, endPos, url, context.self)
          }
        case FileDownloadActor.DownloadDoneResult(taskBeginTime, fileTotalLength, _) =>
          val success =
            """
              |  .-')                                       ('-.    .-')     .-')
              | ( OO ).                                   _(  OO)  ( OO ).  ( OO ).
              |(_)---\_) ,--. ,--.     .-----.   .-----. (,------.(_)---\_)(_)---\_)
              |/    _ |  |  | |  |    '  .--./  '  .--./  |  .---'/    _ | /    _ |
              |\  :` `.  |  | | .-')  |  |('-.  |  |('-.  |  |    \  :` `. \  :` `.
              | '..`''.) |  |_|( OO )/_) |OO  )/_) |OO  )(|  '--.  '..`''.) '..`''.)
              |.-._)   \ |  | | `-' /||  |`-'| ||  |`-'|  |  .--' .-._)   \.-._)   \
              |\       /('  '-'(_.-'(_'  '--'\(_'  '--'\  |  `---.\       /\       /
              | `-----'   `-----'      `-----'   `-----'  `------' `-----'  `-----'
              |""".stripMargin
          printSpeed(fileTotalLength, taskBeginTime, System.currentTimeMillis(), prefix = s"total speed: ", suffix = s"all task finished, download successfully\n $success")
          clearTempFiles()
          FileDownloadActorMain.system ! ShutdownSystem(None)
      }
      Behaviors.same
    }
  }
}

https://github.com/jxnu-liguobin/scala-download/blob/master/src/main/scala/io/github/dreamylost/impl/actor/FileDownloadAuditActor.scala


回帖