IT TIP

처리 도관 1 개, 동일한 유형의 IO 소스 2 개

itqueen 2020. 11. 28. 13:23
반응형

처리 도관 1 개, 동일한 유형의 IO 소스 2 개


GHC Haskellstm, network-conduit 및 conduit을 사용하는 응용 프로그램에서 .NET을 사용하여 자동으로 분기되는 각 소켓에 대한 가닥이 runTCPServer있습니다. 스트랜드는 방송 TChan을 사용하여 다른 스트랜드와 통신 할 수 있습니다.

이것은 내가 도관 "체인"을 설정하는 방법을 보여줍니다.

여기에 이미지 설명 입력

그래서 우리가 여기에있는 것은 (각각 헬퍼 도관에 묶인) 두 개의 소스입니다. Packet이것은 encoder수용하고로 변할 객체를 생성 한 ByteString다음 소켓을 보냅니다. 두 입력을 효율적으로 융합하는 데 (성능이 문제가 됨) 많은 어려움을 겪었습니다.

누군가 나를 올바른 방향으로 안내해 주시면 감사하겠습니다.


시도하지 않고이 질문을 게시하는 것은 무례 할 수 있으므로 이전에 시도한 내용을 여기에 넣겠습니다.

TMChan (닫을 수있는 채널)에서 소스를 생성 (차단)하는 함수를 작성 / 체리 선택했습니다.

-- | Takes a generic type of STM chan and, given read and close functionality,
--   returns a conduit 'Source' which consumes the elements of the channel.
chanSource 
    :: (MonadIO m, MonadSTM m)
    => a                    -- ^ The channel
    -> (a -> STM (Maybe b)) -- ^ The read function
    -> (a -> STM ())        -- ^ The close/finalizer function
    -> Source m b
chanSource ch readCh closeCh = ConduitM pull
    where close     = liftSTM $ closeCh ch
          pull      = PipeM $ liftSTM $ readCh ch >>= translate
          translate = return . maybe (Done ()) (HaveOutput pull close)

마찬가지로 Chan을 싱크로 변환하는 기능;

-- | Takes a stream and, given write and close functionality, returns a sink
--   which wil consume elements and broadcast them into the channel 
chanSink
    :: (MonadIO m, MonadSTM m)
    => a                 -- ^ The channel
    -> (a -> b -> STM()) -- ^ The write function
    -> (a -> STM())      -- ^ The close/finalizer function
    -> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
    where close  = const . liftSTM $ closeCh ch
          sink   = NeedInput push close
          write  = liftSTM . writeCh ch
          push x = PipeM $ write x >> return sink

그런 다음 mergeSources는 간단합니다. fork 2 스레드 (내가 정말로하고 싶지 않지만 도대체 무엇인가)는 새로운 항목을 하나의 목록에 넣을 수 있으며 소스를 생성합니다.

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
--   a source which consumes the elements of the channel.
mergeSources
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
    => [Source (ResourceT m) a]             -- ^ The list of sources
    -> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    where push c s = s $$ chanSink c writeTMChan closeTMChan
          fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
          retn c   = return $ chanSource c readTMChan closeTMChan

이 함수들을 타입 체크하는 데 성공했지만, 타입 체크를 위해이 기능들을 활용하는 데는 실패했습니다.

-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
    mergsrc $$ protocol $= encoder =$ appSink appdata
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
          mergsrc = mergeSources [appSource appdata $= decoder, chansrc]

-- | Structure which holds mutable information for clients
data SessionState = SessionState
    { _ssBroadcast     :: TMChan Packet -- ^ Outbound packet broadcast channel
    }

makeLenses ''SessionState

-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)

-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO

이 방법은 어쨌든 결함이 있다고 생각합니다. 중간 목록과 변환이 많이 있습니다. 이것은 성능에 좋지 않습니다. 지침 구하기.


PS. From what I can understand, this is not a duplicate of; Fusing conduits with multiple inputs , as in my situation both sources produce the same type and I don't care from which source the Packet object is produced, as long as I'm not waiting on one while another has objects ready to be consumed.

PPS. I apologize for the usage (and therefore requirement of knowledge) of Lens in example code.


I don't know if it's any help, but I tried to implement Iain's suggestion and made a variant of mergeSources' that stops as soon as any of the channels does:

mergeSources' :: (MonadIO m, MonadBaseControl IO m)
              => [Source (ResourceT m) a] -- ^ The sources to merge.
              -> Int -- ^ The bound of the intermediate channel.
              -> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
    c <- liftSTM $ newTBMChan bound
    mapM_ (\s -> resourceForkIO $
                    s $$ chanSink c writeTBMChan closeTBMChan) sx
    return $ sourceTBMChan c

(This simple addition is available here).

Some comments to your version of mergeSources (take them with a grain of salt, it can be I didn't understand something well):

  • Using ...TMChan instead of ...TBMChan seems dangerous. If the writers are faster than the reader, your heap will blow. Looking at your diagram it seems that this can easily happen, if your TCP peer doesn't read data fast enough. So I'd definitely use ...TBMChan with perhaps large but limited bound.
  • You don't need the MonadSTM m constraint. All STM stuff is wrapped into IO with

    liftSTM = liftIO . atomically
    

    Maybe this will help you slightly when using mergeSources' in serverApp.

  • Just a cosmetic issue, I found

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    

    very hard to read due to its use of liftA2 on the (->) r monad. I'd say

    do
        c <- liftSTM newTMChan
        fsrc sx c
        retn c
    

    would be longer, but much easier to read.

Could you perhaps create a self-contained project where it would be possible to play with serverApp?

참고 URL : https://stackoverflow.com/questions/16757060/one-processing-conduit-2-io-sources-of-the-same-type

반응형